diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2023-10-13 17:05:11 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-02-05 10:49:37 +0100 |
commit | fb9ca2c2704768dcd8b1b2d27741a2c043834f56 (patch) | |
tree | 5a468a927f57339a4b3467964b2cfb3ec88dd320 /ishtar_common/utils.py | |
parent | f6ff3b698c548098023135b97e62ec69a23bdd3c (diff) | |
download | Ishtar-fb9ca2c2704768dcd8b1b2d27741a2c043834f56.tar.bz2 Ishtar-fb9ca2c2704768dcd8b1b2d27741a2c043834f56.zip |
✨ background task: set a low priority queue for imports
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r-- | ishtar_common/utils.py | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index d5ace858d..44d42f00f 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -124,6 +124,12 @@ if settings.USE_BACKGROUND_TASK: pass +@task() +def test_task(arg): + print("Running task with priority: {}".format(arg)) + time.sleep(3) + + class BColors: """ Bash colors. Don't forget to finish your colored string with ENDC. @@ -513,7 +519,9 @@ def revoke_old_task(kwargs, action_name, task_id, instance_cls): cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) -def load_task(task_func, task_name, checks, sender, **kwargs): +def load_task(task_func, task_name, checks, sender, queue=None, **kwargs): + if not queue: + queue = settings.CELERY_DEFAULT_QUEUE instance = kwargs.get("instance", None) if not instance: return @@ -539,7 +547,8 @@ def load_task(task_func, task_name, checks, sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - task_item = task_func.delay(sender, **kwargs) + kwargs["queue"] = queue + task_item = task_func.apply_async([sender], kwargs, queue=queue) revoke_old_task(kwargs, task_name, task_item.id, instance.__class__) return task_item @@ -553,9 +562,11 @@ def cached_label_changed(sender, **kwargs): if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and ( instance.timestamp_label or 0) >= (instance._timestamp or 0): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \ and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None): - changed = load_task(_external_id_changed, "external_id_changed", None, sender, **kwargs) + changed = load_task(_external_id_changed, "external_id_changed", None, sender, + queue=queue, **kwargs) if changed and ( not settings.USE_BACKGROUND_TASK or not instance.pk @@ -569,7 +580,7 @@ def cached_label_changed(sender, **kwargs): if not force_update and getattr(instance, "_cached_label_checked", False): return return load_task(_cached_label_changed, "cached_label_changed", None, sender, - **kwargs) + queue=queue, **kwargs) @task() @@ -590,6 +601,7 @@ def _cached_label_changed(sender, **kwargs): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp) + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}") if hasattr(instance, "refresh_cache"): instance.refresh_cache() @@ -666,8 +678,9 @@ def external_id_changed(sender, **kwargs): return if getattr(instance, "_external_id_checked", None): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_external_id_changed, "external_id_changed", - ["_external_id_changed"], sender, **kwargs) + ["_external_id_changed"], sender, queue=queue, **kwargs) @task() @@ -679,6 +692,7 @@ def _external_id_changed(sender, **kwargs): return if getattr(instance, "_external_id_checked", None): return + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) updated = False if not instance.external_id or instance.auto_external_id: external_id = get_generated_id(instance.SLUG + "_external_id", instance) @@ -839,8 +853,12 @@ def get_srid_obj_from_point(point): def post_save_geodata(sender, **kwargs): + instance = kwargs["instance"] + if not instance: + return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender, - **kwargs) + queue=queue, **kwargs) @task() @@ -855,6 +873,7 @@ def _post_save_geodata(sender, **kwargs): if getattr(instance, "_post_saved_geo", False): return + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) modified = False if getattr(instance, "post_save_geo", False): # TODO: geovectordata -> no post_save_geo: delete? @@ -904,8 +923,9 @@ def post_save_geo(sender, **kwargs): return if getattr(instance, "_post_saved_geo", False): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"], - sender, **kwargs) + sender, queue=queue, **kwargs) @task() @@ -928,6 +948,7 @@ def _post_save_geo(sender, **kwargs): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp) + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}") instance._post_saved_geo = True @@ -1627,6 +1648,7 @@ def m2m_historization_changed(sender, **kwargs): obj = kwargs.get("instance", None) if not obj: return + obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) hist_values = obj.history_m2m or {} for attr in obj.HISTORICAL_M2M: values = [] |