From 270f94028d94aa606c95bf40bb672e69046fc57c Mon Sep 17 00:00:00 2001 From: Étienne Loks Date: Thu, 26 Aug 2021 16:40:16 +0200 Subject: Celery workers: revoke non necessary tasks --- ishtar_common/utils.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'ishtar_common') diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index e6283ae5e..e4aade575 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -100,11 +100,16 @@ def fake_task(*args): task = fake_task +celery_app = None + if settings.USE_BACKGROUND_TASK: try: from celery import shared_task task = shared_task + celery_app = getattr( + import_module(settings.ROOT_PATH.split("/")[-2] + ".celery_app"), "app" + ) except ModuleNotFoundError: pass @@ -473,6 +478,14 @@ def cached_label_and_geo_changed(sender, **kwargs): post_save_geo(sender=sender, **kwargs) +def revoke_old_task(kwargs, action_name, task_id, instance_cls): + kwargs["action"] = action_name + key, old_task_id = get_cache(instance_cls, kwargs) + if old_task_id: + celery_app.control.revoke(old_task_id) + cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) + + def cached_label_changed(sender, **kwargs): if not kwargs.get("instance"): return @@ -503,7 +516,9 @@ def cached_label_changed(sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - return _cached_label_changed.delay(sender, **kwargs) + task_item = _cached_label_changed.delay(sender, **kwargs) + revoke_old_task(kwargs, "cached_label_changed", task_item.id, instance.__class__) + return task_item @task() @@ -732,7 +747,9 @@ def post_save_geo(sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - return _post_save_geo.delay(sender, **kwargs) + task_item = _post_save_geo.delay(sender, **kwargs) + revoke_old_task(kwargs, "post_save_geo", task_item.id, instance.__class__) + return task_item @task() -- cgit v1.2.3