summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2021-08-26 16:40:16 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2021-09-01 14:25:22 +0200
commit567c0d596bea28a91e90336c38cc80b5011855bc (patch)
tree30e8d00c7ed20ad92f4cab31db0165ceb7b9b445
parent1493e5b19725eae0f57e827e464483ed8bd98635 (diff)
downloadIshtar-567c0d596bea28a91e90336c38cc80b5011855bc.tar.bz2
Ishtar-567c0d596bea28a91e90336c38cc80b5011855bc.zip
Celery workers: revoke non necessary tasks
-rw-r--r--ishtar_common/utils.py21
1 files changed, 19 insertions, 2 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index ed183e4d1..2e9039155 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -100,11 +100,16 @@ def fake_task(*args):
task = fake_task
+celery_app = None
+
if settings.USE_BACKGROUND_TASK:
try:
from celery import shared_task
task = shared_task
+ celery_app = getattr(
+ import_module(settings.ROOT_PATH.split("/")[-2] + ".celery_app"), "app"
+ )
except ModuleNotFoundError:
pass
@@ -473,6 +478,14 @@ def cached_label_and_geo_changed(sender, **kwargs):
post_save_geo(sender=sender, **kwargs)
+def revoke_old_task(kwargs, action_name, task_id, instance_cls):
+ kwargs["action"] = action_name
+ key, old_task_id = get_cache(instance_cls, kwargs)
+ if old_task_id:
+ celery_app.control.revoke(old_task_id)
+ cache.set(key, task_id, settings.CACHE_TIMEOUT * 4)
+
+
def cached_label_changed(sender, **kwargs):
if not kwargs.get("instance"):
return
@@ -503,7 +516,9 @@ def cached_label_changed(sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- return _cached_label_changed.delay(sender, **kwargs)
+ task_item = _cached_label_changed.delay(sender, **kwargs)
+ revoke_old_task(kwargs, "cached_label_changed", task_item.id, instance.__class__)
+ return task_item
@task()
@@ -732,7 +747,9 @@ def post_save_geo(sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- return _post_save_geo.delay(sender, **kwargs)
+ task_item = _post_save_geo.delay(sender, **kwargs)
+ revoke_old_task(kwargs, "post_save_geo", task_item.id, instance.__class__)
+ return task_item
@task()