diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-08-26 16:40:16 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-09-01 14:25:22 +0200 |
commit | 567c0d596bea28a91e90336c38cc80b5011855bc (patch) | |
tree | 30e8d00c7ed20ad92f4cab31db0165ceb7b9b445 | |
parent | 1493e5b19725eae0f57e827e464483ed8bd98635 (diff) | |
download | Ishtar-567c0d596bea28a91e90336c38cc80b5011855bc.tar.bz2 Ishtar-567c0d596bea28a91e90336c38cc80b5011855bc.zip |
Celery workers: revoke non necessary tasks
-rw-r--r-- | ishtar_common/utils.py | 21 |
1 files changed, 19 insertions, 2 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index ed183e4d1..2e9039155 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -100,11 +100,16 @@ def fake_task(*args): task = fake_task +celery_app = None + if settings.USE_BACKGROUND_TASK: try: from celery import shared_task task = shared_task + celery_app = getattr( + import_module(settings.ROOT_PATH.split("/")[-2] + ".celery_app"), "app" + ) except ModuleNotFoundError: pass @@ -473,6 +478,14 @@ def cached_label_and_geo_changed(sender, **kwargs): post_save_geo(sender=sender, **kwargs) +def revoke_old_task(kwargs, action_name, task_id, instance_cls): + kwargs["action"] = action_name + key, old_task_id = get_cache(instance_cls, kwargs) + if old_task_id: + celery_app.control.revoke(old_task_id) + cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) + + def cached_label_changed(sender, **kwargs): if not kwargs.get("instance"): return @@ -503,7 +516,9 @@ def cached_label_changed(sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - return _cached_label_changed.delay(sender, **kwargs) + task_item = _cached_label_changed.delay(sender, **kwargs) + revoke_old_task(kwargs, "cached_label_changed", task_item.id, instance.__class__) + return task_item @task() @@ -732,7 +747,9 @@ def post_save_geo(sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - return _post_save_geo.delay(sender, **kwargs) + task_item = _post_save_geo.delay(sender, **kwargs) + revoke_old_task(kwargs, "post_save_geo", task_item.id, instance.__class__) + return task_item @task() |