diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-08-26 16:40:16 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2022-07-08 09:58:48 +0200 |
commit | bfe367742d2629f810d762dfe1e724ca357f0612 (patch) | |
tree | 1426e381a35314b004f10fdb379971190b7062db /ishtar_common/utils.py | |
parent | afb17f0bbdbba3c05ac264e85db6bc1c64f60dea (diff) | |
download | Ishtar-bfe367742d2629f810d762dfe1e724ca357f0612.tar.bz2 Ishtar-bfe367742d2629f810d762dfe1e724ca357f0612.zip |
Celery workers: revoke non necessary tasks
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r-- | ishtar_common/utils.py | 21 |
1 files changed, 19 insertions, 2 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index 5bd5f4ec5..3d3131570 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -101,11 +101,16 @@ def fake_task(*args): task = fake_task +celery_app = None + if settings.USE_BACKGROUND_TASK: try: from celery import shared_task task = shared_task + celery_app = getattr( + import_module(settings.ROOT_PATH.split("/")[-2] + ".celery_app"), "app" + ) except ModuleNotFoundError: pass @@ -474,6 +479,14 @@ def cached_label_and_geo_changed(sender, **kwargs): post_save_geo(sender=sender, **kwargs) +def revoke_old_task(kwargs, action_name, task_id, instance_cls): + kwargs["action"] = action_name + key, old_task_id = get_cache(instance_cls, kwargs) + if old_task_id: + celery_app.control.revoke(old_task_id) + cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) + + def cached_label_changed(sender, **kwargs): if not kwargs.get("instance"): return @@ -504,7 +517,9 @@ def cached_label_changed(sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - return _cached_label_changed.delay(sender, **kwargs) + task_item = _cached_label_changed.delay(sender, **kwargs) + revoke_old_task(kwargs, "cached_label_changed", task_item.id, instance.__class__) + return task_item @task() @@ -735,7 +750,9 @@ def post_save_geo(sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - return _post_save_geo.delay(sender, **kwargs) + task_item = _post_save_geo.delay(sender, **kwargs) + revoke_old_task(kwargs, "post_save_geo", task_item.id, instance.__class__) + return task_item @task() |