#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2010-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. import csv import json import os import shutil import tempfile import urllib import zipfile from ajax_select import make_ajax_form from ajax_select.fields import AutoCompleteSelectField, \ AutoCompleteSelectMultipleField from django.conf import settings from django.conf.urls import url from django.contrib import admin, messages from django.contrib.admin.views.main import ChangeList from django.contrib.auth.admin import GroupAdmin, UserAdmin from django.contrib.auth.models import Group, User from django.contrib.contenttypes.models import ContentType from django.contrib.sites.admin import SiteAdmin from django.contrib.sites.models import Site from django.contrib.gis.forms import PointField, OSMWidget, MultiPolygonField from django.contrib.gis.geos import GEOSGeometry, MultiPolygon from django.contrib.gis.gdal.error import GDALException from django.contrib.gis.geos.error import GEOSException from django.core.cache import cache from django.core.urlresolvers import reverse from django.db.models.fields import BooleanField, IntegerField, FloatField, \ CharField, FieldDoesNotExist from django.db.models.fields.related import ForeignKey from django.forms import BaseInlineFormSet from django.http import HttpResponseRedirect, HttpResponse from django.shortcuts import render from django.utils.decorators import method_decorator from django.utils.text import slugify, mark_safe from django.utils.translation import ugettext_lazy as _ from django.views.decorators.csrf import csrf_protect from django import forms from ishtar_common import models from ishtar_common.apps import admin_site from ishtar_common.utils import get_cache, create_slug from ishtar_common import forms as common_forms from archaeological_files import forms as file_forms from archaeological_files_pdl import forms as file_pdl_forms from archaeological_operations import forms as operation_forms from archaeological_context_records import forms as context_record_forms from archaeological_finds import forms as find_forms, \ forms_treatments as treatment_forms from archaeological_warehouse import forms as warehouse_forms csrf_protect_m = method_decorator(csrf_protect) ISHTAR_FORMS = [common_forms, file_pdl_forms, file_forms, operation_forms, context_record_forms, find_forms, treatment_forms, warehouse_forms] class ImportGenericForm(forms.Form): csv_file = forms.FileField( _(u"CSV file"), help_text=_(u"Only unicode encoding is managed - convert your" u" file first") ) def change_value(attribute, value, description): """ Action to change a specific value in a list """ def _change_value(modeladmin, request, queryset): for obj in queryset.order_by('pk'): setattr(obj, attribute, value) obj.save() url = reverse( 'admin:%s_%s_changelist' % ( modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + '?' + urllib.urlencode(request.GET) return HttpResponseRedirect(url) _change_value.short_description = description _change_value.__name__ = str(slugify(description)) return _change_value def export_as_csv_action(description=_(u"Export selected as CSV file"), fields=None, exclude=None, header=True): """ This function returns an export csv action 'fields' and 'exclude' work like in django ModelForm 'header' is whether or not to output the column names as the first row """ def export_as_csv(modeladmin, request, queryset): """ Generic csv export admin action. based on http://djangosnippets.org/snippets/1697/ """ opts = modeladmin.model._meta field_names = set([field.name for field in opts.fields]) if fields: fieldset = set(fields) field_names = field_names & fieldset elif exclude: excludeset = set(exclude) field_names = field_names - excludeset response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = 'attachment; filename=%s.csv' % \ unicode(opts).replace('.', '_') writer = csv.writer(response) if header: writer.writerow(list(field_names)) for obj in queryset.order_by('pk'): row = [] for field in field_names: value = getattr(obj, field) if hasattr(value, 'txt_idx'): value = getattr(value, 'txt_idx') elif hasattr(value, 'slug'): value = getattr(value, 'txt_idx') elif value is None: value = "" else: value = unicode(value).encode("utf-8", "replace") row.append(value) writer.writerow(row) return response export_as_csv.short_description = description return export_as_csv class HistorizedObjectAdmin(admin.ModelAdmin): readonly_fields = ['history_creator', 'history_modifier', 'search_vector'] def save_model(self, request, obj, form, change): obj.history_modifier = request.user obj.save() def get_readonly_fields(self, request, obj=None): if obj: # editing an existing object return tuple(self.readonly_fields or []) + tuple(['imports']) return self.readonly_fields def get_exclude(self, request, obj=None): if not obj: return tuple(self.exclude or []) + tuple(['imports']) return self.exclude class MyGroupAdmin(GroupAdmin): class Media: css = { "all": ("media/admin.css",) } admin_site.register(User, UserAdmin) admin_site.register(Group, MyGroupAdmin) admin_site.register(Site, SiteAdmin) class AdminIshtarSiteProfileForm(forms.ModelForm): class Meta: model = models.IshtarSiteProfile exclude = [] default_center = PointField(label=_(u"Maps - default center"), widget=OSMWidget) class IshtarSiteProfileAdmin(admin.ModelAdmin): list_display = ('label', 'slug', 'active', 'files', 'context_record', 'find', 'warehouse', 'mapping', 'preservation') model = models.IshtarSiteProfile form = AdminIshtarSiteProfileForm admin_site.register(models.IshtarSiteProfile, IshtarSiteProfileAdmin) class DepartmentAdmin(admin.ModelAdmin): list_display = ('number', 'label',) model = models.Department admin_site.register(models.Department, DepartmentAdmin) class OrganizationAdmin(HistorizedObjectAdmin): list_display = ('pk', 'name', 'organization_type') list_filter = ("organization_type",) search_fields = ('name',) exclude = ('merge_key', 'merge_exclusion', 'merge_candidate', ) model = models.Organization admin_site.register(models.Organization, OrganizationAdmin) class ProfileInline(admin.TabularInline): model = models.UserProfile verbose_name = _(u"Profile") verbose_name_plural = _(u"Profiles") extra = 1 class PersonAdmin(HistorizedObjectAdmin): list_display = ('pk', 'name', 'surname', 'raw_name', 'email') list_filter = ("person_types",) search_fields = ('name', 'surname', 'email', 'raw_name') exclude = ('merge_key', 'merge_exclusion', 'merge_candidate', ) form = make_ajax_form(models.Person, {'attached_to': 'organization'}) model = models.Person inlines = [ProfileInline] admin_site.register(models.Person, PersonAdmin) class AuthorAdmin(admin.ModelAdmin): list_display = ['person', 'author_type'] list_filter = ("author_type",) search_fields = ('person__name', 'person__surname', 'person__attached_to__name') model = models.Author form = make_ajax_form(models.Author, {'person': 'person'}) admin_site.register(models.Author, AuthorAdmin) class GlobalVarAdmin(admin.ModelAdmin): list_display = ['slug', 'description', 'value'] admin_site.register(models.GlobalVar, GlobalVarAdmin) class ChangeListForChangeView(ChangeList): def get_filters_params(self, params=None): """ Get the current list queryset parameters from _changelist_filters """ filtered_params = {} lookup_params = super( ChangeListForChangeView, self).get_filters_params(params) if '_changelist_filters' in lookup_params: field_names = [field.name for field in self.model._meta.get_fields()] params = lookup_params.pop('_changelist_filters') for param in params.split("&"): key, value = param.split("=") if not value or key not in field_names: continue filtered_params[key] = value return filtered_params class ImportActionAdmin(admin.ModelAdmin): change_list_template = "admin/gen_change_list.html" import_keys = ['slug', 'txt_idx'] def get_urls(self): urls = super(ImportActionAdmin, self).get_urls() my_urls = [ url(r'^import-from-csv/$', self.import_generic), ] return my_urls + urls def import_generic(self, request): form = None if 'apply' in request.POST: form = ImportGenericForm(request.POST, request.FILES) if form.is_valid(): csv_file = request.FILES['csv_file'] reader = csv.DictReader(csv_file) created, updated, missing_parent = 0, 0, [] for row in reader: slug_col = None for key in self.import_keys: if key in row: slug_col = key break if not slug_col: self.message_user( request, unicode(_(u"The CSV file should at least have a " u"{} column")).format( u"/".join(self.import_keys))) return slug = row.pop(slug_col) if 'id' in row: row.pop('id') if 'pk' in row: row.pop('pk') for k in row.keys(): value = row[k] if value == 'None': value = '' try: field = self.model._meta.get_field(k) except FieldDoesNotExist: row.pop(k) continue if isinstance(field, CharField): if not value: value = u"" else: value = value.decode('utf-8') if isinstance(field, IntegerField): if not value: value = None else: value = int(value) elif isinstance(field, FloatField): if not value: value = None else: value = float(value) elif isinstance(field, BooleanField): if value in ('true', 'True', '1'): value = True elif value in ('false', 'False', '0'): value = False else: value = None elif isinstance(field, ForeignKey): if not value: value = None else: model = field.rel.to try: value = model.objects.get( **{slug_col: value} ) except model.DoesNotExist: missing_parent.append(row.pop(k)) continue row[k] = value values = { slug_col: slug, 'defaults': row } obj, c = self.model.objects.get_or_create( **values) if not c: updated += 1 self.model.objects.filter(pk=obj.pk).update(**row) else: created += 1 if created: self.message_user( request, unicode(_(u"%d item(s) created.")) % created) if updated: self.message_user( request, unicode(_(u"%d item(s) updated.")) % updated) if missing_parent: self.message_user( request, unicode(_(u"These parents are missing: {}")).format( u" ; ".join(missing_parent) )) url = reverse( 'admin:%s_%s_changelist' % ( self.model._meta.app_label, self.model._meta.model_name) ) return HttpResponseRedirect(url) if not form: form = ImportGenericForm() return render( request, 'admin/import_from_file.html', {'file_form': form, 'current_action': 'import_generic'} ) class ImportGeoJsonForm(forms.Form): json_file = forms.FileField( _(u"Geojson file"), help_text=_(u"Only unicode encoding is managed - convert your" u" file first. The file must be a geojson file or a zip " u"containing a geojson file.") ) numero_insee_prefix = forms.CharField( label=_(u"Prefix for numero INSEE"), max_length=20, required=False) numero_insee_name = forms.CharField( label=_(u"Field name for numero INSEE"), max_length=200, initial='code_insee') name_name = forms.CharField( label=_(u"Field name for name"), max_length=200, initial='nom') UNIT_CHOICES = (('1', _(u"m2")), ('1000', _(u"km2"))) surface_unit = forms.ChoiceField( label=_(u"Surface unit"), choices=UNIT_CHOICES) surface_name = forms.CharField( label=_(u"Field name for surface"), max_length=200, required=False) year_name = forms.CharField( label=_(u"Field name for year"), max_length=200, required=False ) class ImportGEOJSONActionAdmin(object): def get_urls(self): urls = super(ImportGEOJSONActionAdmin, self).get_urls() my_urls = [ url(r'^import-from-geojson/$', self.import_geojson), ] return my_urls + urls def geojson_values(self, request, idx, feature, keys, trace_error=True): for key in ("geometry", "properties"): if key in feature: continue if trace_error: error = unicode( _(u"\"{}\" not found in feature {}") ).format(key, idx) self.message_user(request, error, level=messages.ERROR) return False values = {} for key in keys: if not key.endswith('_name'): continue if not keys[key]: values[key[:-len("_name")]] = None continue if keys[key] not in feature['properties']: if trace_error: error = unicode( _(u"\"{}\" not found in properties of feature {}") ).format(keys[key], idx) self.message_user(request, error, level=messages.ERROR) return False value = feature['properties'][keys[key]] if key == 'numero_insee_name' and keys['insee_prefix']: value = keys['insee_prefix'] + value values[key[:-len("_name")]] = value try: geom = GEOSGeometry(json.dumps(feature['geometry'])) except (GEOSException, GDALException): if trace_error: error = unicode( _(u"Bad geometry for feature {}") ).format(idx) self.message_user(request, error, level=messages.ERROR) return False if geom.geom_type == 'Point': values['center'] = geom elif geom.geom_type == 'MultiPolygon': values['limit'] = geom elif geom.geom_type == 'Polygon': values['limit'] = MultiPolygon(geom) else: if trace_error: error = unicode( _(u"Geometry {} not managed for towns - feature {}") ).format(geom.geom_type, idx) self.message_user(request, error, level=messages.ERROR) return False if keys['surface_unit'] and values['surface']: try: values['surface'] = keys['surface_unit'] * int( values['surface']) except ValueError: if trace_error: error = unicode( _(u"Bad value for surface: {} - feature {}") ).format(values['surface'], idx) self.message_user(request, error, level=messages.ERROR) return False return values def import_geojson_clean(self, tempdir): if not tempdir: return shutil.rmtree(tempdir) def import_geojson_error(self, request, error, base_dct, tempdir=None): self.import_geojson_clean(tempdir) self.message_user(request, error, level=messages.ERROR) return render( request, 'admin/import_from_file.html', base_dct) def import_geojson(self, request): form = None if 'apply' in request.POST: form = ImportGeoJsonForm(request.POST, request.FILES) if form.is_valid(): json_file_obj = request.FILES['json_file'] base_dct = {'file_form': form, 'current_action': 'import_geojson'} tempdir = tempfile.mkdtemp() tmpfilename = tempdir + os.sep + "dest_file" tmpfile = open(tmpfilename, 'wb+') for chunk in json_file_obj.chunks(): tmpfile.write(chunk) tmpfile.close() json_filename = None if zipfile.is_zipfile(tmpfilename): zfile = zipfile.ZipFile(tmpfilename) for zmember in zfile.namelist(): if os.sep in zmember or u".." in zmember: continue if zmember.endswith("json"): zfile.extract(zmember, tempdir) json_filename = tempdir + os.sep + zmember break if not json_filename: error = _(u"No json file found in zipfile") return self.import_geojson_error(request, error, base_dct, tempdir) else: json_filename = tmpfilename keys = { "numero_insee_name": request.POST.get('numero_insee_name'), "name_name": request.POST.get('name_name'), "surface_name": request.POST.get('surface_name', "") or "", "year_name": request.POST.get('year_name', "") or "", "insee_prefix": request.POST.get('numero_insee_prefix', None) or '', "surface_unit": int(request.POST.get('surface_unit')) } created = 0 updated = 0 with open(json_filename) as json_file_obj: json_file = json_file_obj.read() try: dct = json.loads(json_file) assert 'features' in dct assert dct['features'] except (ValueError, AssertionError): error = _(u"Bad geojson file") return self.import_geojson_error( request, error, base_dct, tempdir) error_count = 0 for idx, feat in enumerate(dct['features']): trace_error = True if error_count == 6: self.message_user(request, _(u"Too many errors..."), level=messages.ERROR) if error_count > 5: trace_error = False values = self.geojson_values( request, idx + 1, feat, keys, trace_error ) if not values: error_count += 1 continue num_insee = values.pop('numero_insee') year = values.pop('year') or None t, c = models.Town.objects.get_or_create( numero_insee=num_insee, year=year, defaults=values) if c: created += 1 else: modified = False for k in values: if values[k] != getattr(t, k): setattr(t, k, values[k]) modified = True if modified: updated += 1 t.save() if created: self.message_user( request, unicode(_(u"%d item(s) created.")) % created) if updated: self.message_user( request, unicode(_(u"%d item(s) updated.")) % updated) self.import_geojson_clean(tempdir) url = reverse( 'admin:%s_%s_changelist' % ( self.model._meta.app_label, self.model._meta.model_name) ) return HttpResponseRedirect(url) if not form: form = ImportGeoJsonForm() return render( request, 'admin/import_from_file.html', {'file_form': form, 'current_action': 'import_geojson'}) class AdminRelatedTownForm(forms.ModelForm): class Meta: model = models.Town.children.through exclude = [] from_town = AutoCompleteSelectField( 'town', required=True, label=_(u"Parent")) class AdminTownForm(forms.ModelForm): class Meta: model = models.Town exclude = ['imports', 'departement'] center = PointField(label=_(u"Center"), required=False, widget=OSMWidget) limit = MultiPolygonField(label=_(u"Limit"), required=False, widget=OSMWidget) children = AutoCompleteSelectMultipleField('town', required=False, label=_(u"Town children")) class TownParentInline(admin.TabularInline): model = models.Town.children.through fk_name = 'to_town' form = AdminRelatedTownForm verbose_name = _(u"Parent") verbose_name_plural = _(u"Parents") extra = 1 class TownAdmin(ImportGEOJSONActionAdmin, ImportActionAdmin): change_list_template = "admin/town_change_list.html" model = models.Town list_display = ['name', 'year'] search_fields = ['name'] readonly_fields = ['cached_label'] if settings.COUNTRY == 'fr': list_display += ['numero_insee'] search_fields += ['numero_insee', 'areas__label'] list_filter = ("areas",) form = AdminTownForm inlines = [TownParentInline] actions = [export_as_csv_action(exclude=['center', 'limit'])] import_keys = ['slug', 'txt_idx', 'numero_insee'] admin_site.register(models.Town, TownAdmin) class GeneralTypeAdmin(ImportActionAdmin): list_display = ['label', 'txt_idx', 'available', 'comment'] search_fields = ('label', 'txt_idx', 'comment',) list_filter = ('available',) save_on_top = True actions = [export_as_csv_action()] prepopulated_fields = {"txt_idx": ("label",)} @csrf_protect_m def get_changelist_queryset(self, request): """ Get the changelist queryset to be used in the change view. Used by previous and next button. Mainly a copy from: django/contrib/admin/options.py ModelAdmin->changelist_view """ list_display = self.get_list_display(request) list_display_links = self.get_list_display_links(request, list_display) list_filter = self.get_list_filter(request) search_fields = self.get_search_fields(request) list_select_related = self.get_list_select_related(request) cl = ChangeListForChangeView( request, self.model, list_display, list_display_links, list_filter, self.date_hierarchy, search_fields, list_select_related, self.list_per_page, self.list_max_show_all, self.list_editable, self, ) return cl.get_queryset(request) def change_view(self, request, object_id, form_url='', extra_context=None): """ Next and previous button on the change view """ if not extra_context: extra_context = {} ids = list(self.get_changelist_queryset(request).values('pk')) previous, current_is_reached, first = None, False, None extra_context['get_attr'] = "" if request.GET: extra_context['get_attr'] = "?" + request.GET.urlencode() for v in ids: pk = str(v['pk']) if pk == object_id: current_is_reached = True if previous: extra_context['previous_item'] = previous elif current_is_reached: extra_context['next_item'] = pk break else: if not first: first = pk previous = pk if 'previous_item' not in extra_context and \ 'next_item' not in extra_context and first: # on modify current object do not match current criteria # next is the first item extra_context['next_item'] = first return super(GeneralTypeAdmin, self).change_view( request, object_id, form_url, extra_context) general_models = [models.OrganizationType, models.SourceType, models.AuthorType, models.TitleType, models.Format, models.SupportType, models.PersonType, models.LicenseType] for model in general_models: admin_site.register(model, GeneralTypeAdmin) class AreaAdmin(GeneralTypeAdmin): list_display = ('label', 'reference', 'parent', 'available') search_fields = ('label', 'reference') list_filter = ('parent',) model = models.Area form = make_ajax_form( model, {'towns': 'town'} ) admin_site.register(models.Area, AreaAdmin) class ProfileTypeAdmin(GeneralTypeAdmin): model = models.ProfileType filter_vertical = ('groups',) admin_site.register(models.ProfileType, ProfileTypeAdmin) class ProfileTypeSummaryAdmin(admin.ModelAdmin): change_list_template = 'admin/profiletype_summary_change_list.html' search_fields = ('label',) list_filter = ('available', 'label') def has_add_permission(self, request, obj=None): return False def changelist_view(self, request, extra_context=None): response = super(ProfileTypeSummaryAdmin, self).changelist_view( request, extra_context=extra_context, ) try: qs = response.context_data["cl"].queryset except (AttributeError, KeyError): return response profile_types = list( qs.order_by("label") ) rights = {} for profile_type in profile_types: rights[profile_type.pk] = [g.pk for g in profile_type.groups.all()] groups = [] ok = mark_safe( u'True'.format( settings.STATIC_URL )) for group in models.Group.objects.order_by("name"): gp = [group.name] for profile_type in profile_types: gp.append( ok if group.pk in rights[profile_type.pk] else "-") groups.append(gp) response.context_data.update({"profile_types": profile_types, "groups": groups}) return response admin_site.register(models.ProfileTypeSummary, ProfileTypeSummaryAdmin) class ImporterDefaultValuesInline(admin.TabularInline): model = models.ImporterDefaultValues class ImporterDefaultAdmin(admin.ModelAdmin): list_display = ('importer_type', 'target') model = models.ImporterDefault inlines = (ImporterDefaultValuesInline,) admin_site.register(models.ImporterDefault, ImporterDefaultAdmin) def duplicate_importertype(modeladmin, request, queryset): res = [] for obj in queryset.order_by('pk'): old_pk = obj.pk obj.pk = None obj.slug = create_slug(models.ImporterType, obj.name) obj.name = obj.name + u" - duplicate" obj.name = obj.name[:200] obj.save() # create new old_obj = modeladmin.model.objects.get(pk=old_pk) for m in old_obj.created_models.all(): obj.created_models.add(m) for u in old_obj.users.all(): obj.users.add(u) for default in old_obj.defaults.all(): default_pk = default.pk default.pk = None # create new default.importer_type = obj default.save() old_default = default.__class__.objects.get(pk=default_pk) for def_value in old_default.default_values.all(): def_value.pk = None def_value.default_target = default def_value.save() for col in old_obj.columns.all(): col_pk = col.pk col.pk = None # create new col.importer_type = obj col.save() old_col = col.__class__.objects.get(pk=col_pk) for df in old_col.duplicate_fields.all(): df.pk = None # create new df.column = col df.save() for tg in old_col.targets.all(): tg.pk = None # create new tg.column = col tg.save() res.append(unicode(obj)) messages.add_message( request, messages.INFO, unicode(_(u"{} importer type(s) duplicated: {}.")).format( queryset.count(), u" ; ".join(res)) ) url = reverse( 'admin:%s_%s_changelist' % ( modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + '?' + urllib.urlencode(request.GET) return HttpResponseRedirect(url) duplicate_importertype.short_description = _(u"Duplicate") class ImporterTypeAdmin(admin.ModelAdmin): list_display = ('name', 'associated_models', 'available') actions = [duplicate_importertype] admin_site.register(models.ImporterType, ImporterTypeAdmin) class RegexpAdmin(admin.ModelAdmin): list_display = ('name', 'description', "regexp") admin_site.register(models.Regexp, RegexpAdmin) def duplicate_importercolumn(modeladmin, request, queryset): res = [] for col in queryset.order_by('col_number'): old_pk = col.pk col.pk = None col.label = (col.label or u"") + u" - duplicate" col.label = col.label[:200] # get the next available col number col_nb = col.col_number + 1 while modeladmin.model.objects.filter( col_number=col_nb, importer_type=col.importer_type).count(): col_nb += 1 col.col_number = col_nb col.save() # create new old_col = modeladmin.model.objects.get(pk=old_pk) for df in old_col.duplicate_fields.all(): df.pk = None # create new df.column = col df.save() for tg in old_col.targets.all(): tg.pk = None # create new tg.column = col tg.save() res.append(unicode(col)) messages.add_message( request, messages.INFO, unicode(_(u"{} importer column(s) duplicated: {}.")).format( queryset.count(), u" ; ".join(res)) ) url = reverse( 'admin:%s_%s_changelist' % ( modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + '?' + urllib.urlencode(request.GET) return HttpResponseRedirect(url) duplicate_importercolumn.short_description = _(u"Duplicate") def shift_right(modeladmin, request, queryset): for col in queryset.order_by('-col_number'): # get the next available col number col_nb = col.col_number + 1 while modeladmin.model.objects.filter( col_number=col_nb, importer_type=col.importer_type).count(): col_nb += 1 col.col_number = col_nb col.save() messages.add_message( request, messages.INFO, unicode(_(u"{} importer column(s) right-shifted.")).format( queryset.count()) ) url = reverse( 'admin:%s_%s_changelist' % ( modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + '?' + urllib.urlencode(request.GET) # for Python 3, use urllib.parse.urlencode return HttpResponseRedirect(url) shift_right.short_description = _(u"Shift right") def shift_left(modeladmin, request, queryset): errors, oks = 0, 0 for col in queryset.order_by('col_number'): # get the next available col number if col.col_number == 1: errors += 1 continue col_nb = col.col_number - 1 error = False while modeladmin.model.objects.filter( col_number=col_nb, importer_type=col.importer_type).count(): col_nb -= 1 if col_nb <= 0: errors += 1 continue col.col_number = col_nb col.save() oks += 1 if oks: messages.add_message( request, messages.INFO, unicode(_(u"{} importer column(s) left-shifted.")).format( oks) ) if errors: messages.add_message( request, messages.ERROR, unicode(_(u"{} importer column(s) not left-shifted: no " u"place available.")).format( errors) ) url = reverse( 'admin:%s_%s_changelist' % ( modeladmin.model._meta.app_label, modeladmin.model._meta.model_name) ) + '?' + urllib.urlencode(request.GET) # for Python 3, use urllib.parse.urlencode return HttpResponseRedirect(url) shift_left.short_description = _(u"Shift left") class ImporterDuplicateFieldInline(admin.TabularInline): model = models.ImporterDuplicateField class ImportTargetForm(forms.ModelForm): class Meta: model = models.ImportTarget exclude = [] widgets = { 'comment': forms.TextInput } class ImportTargetInline(admin.TabularInline): model = models.ImportTarget extra = 1 form = ImportTargetForm class ImporterColumnAdmin(admin.ModelAdmin): list_display = ('label', 'importer_type', 'col_number', 'col_string', 'description', 'targets_lbl', 'duplicate_fields_lbl', 'required') list_filter = ('importer_type',) inlines = (ImportTargetInline, ImporterDuplicateFieldInline) actions = [duplicate_importercolumn, shift_left, shift_right] admin_site.register(models.ImporterColumn, ImporterColumnAdmin) class ImporterModelAdmin(admin.ModelAdmin): list_display = ('name', 'klass') model = models.ImporterModel admin_site.register(models.ImporterModel, ImporterModelAdmin) class FormaterTypeAdmin(admin.ModelAdmin): list_display = ('formater_type', 'options') admin_site.register(models.FormaterType, FormaterTypeAdmin) class ImportAdmin(admin.ModelAdmin): list_display = ('name', 'importer_type', 'imported_file', 'user', 'state', 'creation_date') form = make_ajax_form(models.Import, {'user': 'ishtaruser'}) admin_site.register(models.Import, ImportAdmin) class TargetKeyGroupAdmin(admin.ModelAdmin): list_display = ('name', 'all_user_can_use', 'all_user_can_modify', 'available') search_fields = ('name',) admin_site.register(models.TargetKeyGroup, TargetKeyGroupAdmin) class TargetKeyAdmin(admin.ModelAdmin): list_display = ('target', 'importer_type', 'column_nb', 'key', 'value', 'is_set') list_filter = ("is_set", "target__column__importer_type") search_fields = ('target__target', 'value', 'key') admin_site.register(models.TargetKey, TargetKeyAdmin) class OperationTypeAdmin(GeneralTypeAdmin): list_display = GeneralTypeAdmin.list_display + ['order', 'preventive'] model = models.OperationType admin_site.register(models.OperationType, OperationTypeAdmin) class SpatialReferenceSystemAdmin(GeneralTypeAdmin): list_display = GeneralTypeAdmin.list_display + ['order', 'srid'] model = models.SpatialReferenceSystem admin_site.register(models.SpatialReferenceSystem, SpatialReferenceSystemAdmin) class ItemKeyAdmin(admin.ModelAdmin): list_display = ('content_type', 'key', 'content_object', 'importer') search_fields = ('key', ) admin_site.register(models.ItemKey, ItemKeyAdmin) class JsonContentTypeFormMixin(object): class Meta: model = models.JsonDataSection exclude = [] def __init__(self, *args, **kwargs): super(JsonContentTypeFormMixin, self).__init__(*args, **kwargs) choices = [] for pk, label in self.fields['content_type'].choices: if not pk: choices.append((pk, label)) continue ct = ContentType.objects.get(pk=pk) model_class = ct.model_class() if hasattr(model_class, 'data') and \ not hasattr(model_class, 'history_type'): choices.append((pk, label)) self.fields['content_type'].choices = sorted(choices, key=lambda x: x[1]) class JsonDataSectionForm(JsonContentTypeFormMixin, forms.ModelForm): class Meta: model = models.JsonDataSection exclude = [] class JsonDataSectionAdmin(admin.ModelAdmin): list_display = ['name', 'content_type', 'order'] form = JsonDataSectionForm admin_site.register(models.JsonDataSection, JsonDataSectionAdmin) class JsonDataFieldForm(JsonContentTypeFormMixin, forms.ModelForm): class Meta: model = models.JsonDataField exclude = [] class JsonDataFieldAdmin(admin.ModelAdmin): list_display = ['name', 'content_type', 'key', 'display', 'value_type', 'search_index', 'order', 'section'] actions = [ change_value('display', True, _(u"Display selected")), change_value('display', False, _(u"Hide selected")) ] list_filter = ['value_type', 'search_index'] form = JsonDataFieldForm admin_site.register(models.JsonDataField, JsonDataFieldAdmin) def get_choices_form(): cache_key, value = get_cache(models.CustomForm, ['associated-forms']) if value: return value forms = [] register, register_fields = models.CustomForm.register() for slug in register.keys(): forms.append((slug, models.CustomForm._register[slug].form_admin_name)) forms = sorted(forms, key=lambda x: x[1]) cache.set(cache_key, forms, settings.CACHE_TIMEOUT) return forms class CustomFormForm(forms.ModelForm): class Meta: model = models.CustomForm exclude = [] form = forms.ChoiceField(label=_(u"Form"), choices=get_choices_form) users = AutoCompleteSelectMultipleField('ishtaruser', required=False, label=_(u"Users")) class ExcludeFieldFormset(BaseInlineFormSet): def get_form_kwargs(self, index): kwargs = super(ExcludeFieldFormset, self).get_form_kwargs(index) if not self.instance or not self.instance.pk: return kwargs form = self.instance.get_form_class() if not form: kwargs['choices'] = [] return kwargs kwargs['choices'] = [('', '--')] + form.get_custom_fields() return kwargs class ExcludeFieldForm(forms.ModelForm): class Meta: model = models.ExcludedField exclude = [] field = forms.ChoiceField(label=_(u"Field")) def __init__(self, *args, **kwargs): choices = kwargs.pop('choices') super(ExcludeFieldForm, self).__init__(*args, **kwargs) self.fields['field'].choices = choices class ExcludeFieldInline(admin.TabularInline): model = models.ExcludedField extra = 2 form = ExcludeFieldForm formset = ExcludeFieldFormset class JsonFieldFormset(BaseInlineFormSet): def get_form_kwargs(self, index): kwargs = super(JsonFieldFormset, self).get_form_kwargs(index) if not self.instance or not self.instance.pk: return kwargs kwargs['choices'] = [('', '--')] + \ self.instance.get_available_json_fields() return kwargs class JsonFieldForm(forms.ModelForm): class Meta: model = models.CustomFormJsonField exclude = [] def __init__(self, *args, **kwargs): choices = kwargs.pop('choices') super(JsonFieldForm, self).__init__(*args, **kwargs) self.fields['json_field'].choices = choices class JsonFieldInline(admin.TabularInline): model = models.CustomFormJsonField extra = 2 form = JsonFieldForm formset = JsonFieldFormset class CustomFormAdmin(admin.ModelAdmin): list_display = ['name', 'form', 'available', 'enabled', 'apply_to_all', 'users_lbl', 'user_types_lbl'] fields = ('name', 'form', 'available', 'enabled', 'apply_to_all', 'users', 'user_types') form = CustomFormForm inlines = [ExcludeFieldInline, JsonFieldInline] def get_inline_instances(self, request, obj=None): # no inline on creation if not obj: return [] return super(CustomFormAdmin, self).get_inline_instances(request, obj=obj) def get_readonly_fields(self, request, obj=None): if obj: return ('form',) return [] admin_site.register(models.CustomForm, CustomFormAdmin) class AdministrationScriptAdmin(admin.ModelAdmin): list_display = ['name', 'path'] def get_readonly_fields(self, request, obj=None): if obj: return ('path',) return [] admin_site.register(models.AdministrationScript, AdministrationScriptAdmin) class AdministrationTaskAdmin(admin.ModelAdmin): readonly_fields = ('state', 'creation_date', 'launch_date', 'finished_date', "result", ) list_display = ['script', 'state', 'creation_date', 'launch_date', 'finished_date', "result"] list_filter = ['script', 'state'] def get_readonly_fields(self, request, obj=None): if obj: return ("script", ) + self.readonly_fields return self.readonly_fields admin_site.register(models.AdministrationTask, AdministrationTaskAdmin) class UserProfileAdmin(admin.ModelAdmin): list_display = ['person', 'profile_type', 'area_labels'] list_filter = ['profile_type'] search_fields = ['person__raw_name'] model = models.UserProfile form = make_ajax_form(model, {'areas': 'area'}) admin_site.register(models.UserProfile, UserProfileAdmin) basic_models = [models.DocumentTemplate] for model in basic_models: admin_site.register(model)