summaryrefslogtreecommitdiff
path: root/ishtar_common/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r--ishtar_common/utils.py36
1 files changed, 29 insertions, 7 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 074dc9ef7..621e96b11 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -125,6 +125,12 @@ if settings.USE_BACKGROUND_TASK:
pass
+@task()
+def test_task(arg):
+ print("Running task with priority: {}".format(arg))
+ time.sleep(3)
+
+
class BColors:
"""
Bash colors. Don't forget to finish your colored string with ENDC.
@@ -514,7 +520,9 @@ def revoke_old_task(kwargs, action_name, task_id, instance_cls):
cache.set(key, task_id, settings.CACHE_TIMEOUT * 4)
-def load_task(task_func, task_name, checks, sender, **kwargs):
+def load_task(task_func, task_name, checks, sender, queue=None, **kwargs):
+ if not queue:
+ queue = settings.CELERY_DEFAULT_QUEUE
instance = kwargs.get("instance", None)
if not instance:
return
@@ -540,7 +548,8 @@ def load_task(task_func, task_name, checks, sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- task_item = task_func.delay(sender, **kwargs)
+ kwargs["queue"] = queue
+ task_item = task_func.apply_async([sender], kwargs, queue=queue)
revoke_old_task(kwargs, task_name, task_item.id, instance.__class__)
return task_item
@@ -554,9 +563,11 @@ def cached_label_changed(sender, **kwargs):
if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and (
instance.timestamp_label or 0) >= (instance._timestamp or 0):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \
and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None):
- changed = load_task(_external_id_changed, "external_id_changed", None, sender, **kwargs)
+ changed = load_task(_external_id_changed, "external_id_changed", None, sender,
+ queue=queue, **kwargs)
if changed and (
not settings.USE_BACKGROUND_TASK
or not instance.pk
@@ -570,7 +581,7 @@ def cached_label_changed(sender, **kwargs):
if not force_update and getattr(instance, "_cached_label_checked", False):
return
return load_task(_cached_label_changed, "cached_label_changed", None, sender,
- **kwargs)
+ queue=queue, **kwargs)
@task()
@@ -591,6 +602,7 @@ def _cached_label_changed(sender, **kwargs):
return
instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp)
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}")
if hasattr(instance, "refresh_cache"):
instance.refresh_cache()
@@ -667,8 +679,9 @@ def external_id_changed(sender, **kwargs):
return
if getattr(instance, "_external_id_checked", None):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_external_id_changed, "external_id_changed",
- ["_external_id_changed"], sender, **kwargs)
+ ["_external_id_changed"], sender, queue=queue, **kwargs)
@task()
@@ -680,6 +693,7 @@ def _external_id_changed(sender, **kwargs):
return
if getattr(instance, "_external_id_checked", None):
return
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
updated = False
if not instance.external_id or instance.auto_external_id:
external_id = get_generated_id(instance.SLUG + "_external_id", instance)
@@ -840,8 +854,12 @@ def get_srid_obj_from_point(point):
def post_save_geodata(sender, **kwargs):
+ instance = kwargs["instance"]
+ if not instance:
+ return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender,
- **kwargs)
+ queue=queue, **kwargs)
@task()
@@ -856,6 +874,7 @@ def _post_save_geodata(sender, **kwargs):
if getattr(instance, "_post_saved_geo", False):
return
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
modified = False
if getattr(instance, "post_save_geo", False):
# TODO: geovectordata -> no post_save_geo: delete?
@@ -905,8 +924,9 @@ def post_save_geo(sender, **kwargs):
return
if getattr(instance, "_post_saved_geo", False):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"],
- sender, **kwargs)
+ sender, queue=queue, **kwargs)
@task()
@@ -929,6 +949,7 @@ def _post_save_geo(sender, **kwargs):
return
instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp)
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}")
instance._post_saved_geo = True
@@ -1628,6 +1649,7 @@ def m2m_historization_changed(sender, **kwargs):
obj = kwargs.get("instance", None)
if not obj:
return
+ obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
hist_values = obj.history_m2m or {}
for attr in obj.HISTORICAL_M2M:
values = []