summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2021-08-26 16:40:16 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2022-07-08 09:58:48 +0200
commitbfe367742d2629f810d762dfe1e724ca357f0612 (patch)
tree1426e381a35314b004f10fdb379971190b7062db
parentafb17f0bbdbba3c05ac264e85db6bc1c64f60dea (diff)
downloadIshtar-bfe367742d2629f810d762dfe1e724ca357f0612.tar.bz2
Ishtar-bfe367742d2629f810d762dfe1e724ca357f0612.zip
Celery workers: revoke non necessary tasks
-rw-r--r--ishtar_common/utils.py21
1 files changed, 19 insertions, 2 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 5bd5f4ec5..3d3131570 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -101,11 +101,16 @@ def fake_task(*args):
task = fake_task
+celery_app = None
+
if settings.USE_BACKGROUND_TASK:
try:
from celery import shared_task
task = shared_task
+ celery_app = getattr(
+ import_module(settings.ROOT_PATH.split("/")[-2] + ".celery_app"), "app"
+ )
except ModuleNotFoundError:
pass
@@ -474,6 +479,14 @@ def cached_label_and_geo_changed(sender, **kwargs):
post_save_geo(sender=sender, **kwargs)
+def revoke_old_task(kwargs, action_name, task_id, instance_cls):
+ kwargs["action"] = action_name
+ key, old_task_id = get_cache(instance_cls, kwargs)
+ if old_task_id:
+ celery_app.control.revoke(old_task_id)
+ cache.set(key, task_id, settings.CACHE_TIMEOUT * 4)
+
+
def cached_label_changed(sender, **kwargs):
if not kwargs.get("instance"):
return
@@ -504,7 +517,9 @@ def cached_label_changed(sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- return _cached_label_changed.delay(sender, **kwargs)
+ task_item = _cached_label_changed.delay(sender, **kwargs)
+ revoke_old_task(kwargs, "cached_label_changed", task_item.id, instance.__class__)
+ return task_item
@task()
@@ -735,7 +750,9 @@ def post_save_geo(sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- return _post_save_geo.delay(sender, **kwargs)
+ task_item = _post_save_geo.delay(sender, **kwargs)
+ revoke_old_task(kwargs, "post_save_geo", task_item.id, instance.__class__)
+ return task_item
@task()