#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2012 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # See the file COPYING for details. from celery.decorators import task from django.core.exceptions import ObjectDoesNotExist from django.utils.translation import ugettext_lazy as _ from chimere.models import Importer def single_instance_task(timeout): def task_exc(func): def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func() finally: release_lock() return wrapper return task_exc IMPORT_MESSAGES = { 'import_pending':[_(u"Import pending")], 'import_process':[_(u"Import processing")], 'import_done':[_(u"Import successfuly done"), _(u" %(new)d new item(s), %(updated)d updated item(s)")], 'import_failed':[_(u"Import failed"), "%s"], 'import_cancel':[_(u"Import canceled")], 'export_pending':[_(u"Export pending")], 'export_process':[_(u"Export processing")], 'export_done':[_(u"Export successfuly done"), _(u" %(updated)d updated item(s)")], 'export_failed':[_(u"Export failed"), "%s"], 'export_cancel':[_(u"Export canceled")] } @task() def importing(importer_pk): try: importer = Importer.objects.get(pk=importer_pk) except ObjectDoesNotExist: # importer doesn't exists anymore: too late! return if importer.state != IMPORT_MESSAGES['import_pending'][0]: # import canceled or done return importer.state = unicode(IMPORT_MESSAGES['import_process'][0]) importer.save() new_item, updated_item, error = importer.manager.get() if error: importer.state = unicode(IMPORT_MESSAGES['import_failed'][0]) \ + u" - " + unicode(IMPORT_MESSAGES['import_failed'][1]) % error importer.save() return importer.state = unicode(IMPORT_MESSAGES['import_done'][0]) + u" - " \ + unicode(IMPORT_MESSAGES['import_done'][1]) % {'new':new_item, 'updated':updated_item} importer.save() return True @task() @single_instance_task(60*10) def exporting(importer_pk): try: importer = Importer.objects.get(pk=importer_pk) except ObjectDoesNotExist: # importer doesn't exists anymore: too late! return if importer.state != IMPORT_MESSAGES['export_pending'][0]: # import canceled or done return importer.state = unicode(IMPORT_MESSAGES['export_process'][0]) importer.save() updated_item, error = importer.manager.put() if error: importer.state = unicode(IMPORT_MESSAGES['export_failed'][0]) \ + u" - " + unicode(IMPORT_MESSAGES['export_failed'][1]) % error importer.save() return importer.state = unicode(IMPORT_MESSAGES['export_done'][0]) + u" - " \ + unicode(IMPORT_MESSAGES['export_done'][1]) % {'updated':updated_item} importer.save() return True