summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2023-10-13 17:05:11 +0200
committerÉtienne Loks <etienne.loks@iggdrasil.net>2024-02-05 10:49:37 +0100
commitfb9ca2c2704768dcd8b1b2d27741a2c043834f56 (patch)
tree5a468a927f57339a4b3467964b2cfb3ec88dd320
parentf6ff3b698c548098023135b97e62ec69a23bdd3c (diff)
downloadIshtar-fb9ca2c2704768dcd8b1b2d27741a2c043834f56.tar.bz2
Ishtar-fb9ca2c2704768dcd8b1b2d27741a2c043834f56.zip
✨ background task: set a low priority queue for imports
-rw-r--r--Makefile.example4
-rw-r--r--example_project/celery_app.py.sample4
-rw-r--r--example_project/settings.py2
-rw-r--r--ishtar_common/data_importer.py10
-rw-r--r--ishtar_common/models_common.py2
-rw-r--r--ishtar_common/utils.py36
-rw-r--r--ishtar_common/utils_celery.py9
7 files changed, 57 insertions, 10 deletions
diff --git a/Makefile.example b/Makefile.example
index 7773481e6..c23b333d5 100644
--- a/Makefile.example
+++ b/Makefile.example
@@ -183,7 +183,9 @@ runalt: ## run test server on port 9000
cd $(project); $(PYTHON) manage.py runserver 0.0.0.0:9000
runcelery: ## run a celery worker
- celery -A example_project worker -l INFO # -l DEBUG -f debug-celery.log
+ celery -A example_project worker -Q celery -l INFO &
+ celery -A example_project worker -Q low_priority -l INFO
+ # -l DEBUG -f debug-celery.log
blackd: ## blackd service
$(VENV)bin/blackd
diff --git a/example_project/celery_app.py.sample b/example_project/celery_app.py.sample
index 81000bc14..ef8061cbf 100644
--- a/example_project/celery_app.py.sample
+++ b/example_project/celery_app.py.sample
@@ -1,6 +1,6 @@
import os
from celery import Celery
-
+from ishtar_common.utils_celery import config_celery_queue
# change example_project with your project name
project_name = "example_project"
@@ -9,6 +9,6 @@ project_name = "example_project"
os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_name + '.settings')
app = Celery(project_name)
-
+config_celery_queue(app.conf)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
diff --git a/example_project/settings.py b/example_project/settings.py
index 3c0accc82..8e9077aad 100644
--- a/example_project/settings.py
+++ b/example_project/settings.py
@@ -410,6 +410,8 @@ if LOG_LEVEL:
if logger.startswith("archaeological") or logger.startswith("ishtar"):
LOGGING["loggers"][logger]["level"] = LOG_LEVEL
+CELERY_DEFAULT_QUEUE = "celery"
+
if USE_BACKGROUND_TASK:
if not CELERY_BROKER_URL:
CELERY_BROKER_URL = "amqp://localhost"
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py
index 7bbb0753c..6a58e0409 100644
--- a/ishtar_common/data_importer.py
+++ b/ishtar_common/data_importer.py
@@ -883,6 +883,7 @@ class Importer(object):
# force django based post-processing for the item
item = cls.objects.get(pk=pk)
item._timestamp = self.timestamp
+ item._queue = "low_priority"
item.save()
if hasattr(item, "RELATED_POST_PROCESS"):
for related_key in item.RELATED_POST_PROCESS:
@@ -897,6 +898,7 @@ class Importer(object):
try:
item = cls.objects.get(pk=pk)
item._timestamp = self.timestamp
+ item._queue = "low_priority"
item.save()
if hasattr(item, "fix"):
# post save/m2m specific fix
@@ -1105,6 +1107,7 @@ class Importer(object):
def _create_item(self, cls, dct, idx_line):
obj = cls(**dct)
obj._no_post_save = True # delayed at the end of the import
+ obj._queue = "low_priority"
obj.save()
self._add_to_post_save(cls, obj.pk, idx_line)
return obj
@@ -1219,6 +1222,7 @@ class Importer(object):
setattr(obj, k, data["defaults"][k])
obj._no_post_save = True
obj._timestamp = self.timestamp
+ obj._queue = "low_priority"
obj.save()
self._add_to_post_save(obj.__class__, obj.pk, idx_line)
@@ -1258,6 +1262,7 @@ class Importer(object):
for k in geodata:
setattr(item, k, geodata[k])
item._timestamp = self.timestamp
+ item._queue = "low_priority"
item.save()
else:
item = GeoVectorData.objects.create(**geodata)
@@ -1273,6 +1278,7 @@ class Importer(object):
obj._post_saved_geo = True
obj._no_move = True
obj.skip_history_when_saving = True
+ obj._queue = "low_priority"
obj.save()
n = datetime.datetime.now()
@@ -1345,6 +1351,7 @@ class Importer(object):
setattr(t_obj, k, data["defaults"][k])
t_obj._no_post_save = True
t_obj._timestamp = self.timestamp
+ t_obj._queue = "low_priority"
t_obj.save()
self._add_to_post_save(t_obj.__class__, t_obj.pk, idx_line)
if self.import_instance and hasattr(t_obj, "imports") and created:
@@ -1685,6 +1692,7 @@ class Importer(object):
setattr(v, k, extra_fields[k])
if changed:
v._timestamp = self.timestamp
+ v._queue = "low_priority"
v.save()
for att, objs in m2m_m2ms:
if type(objs) not in (list, tuple):
@@ -2034,6 +2042,7 @@ class Importer(object):
for k in updated_dct:
setattr(obj, k, updated_dct[k])
obj._timestamp = self.timestamp
+ obj._queue = "low_priority"
obj.save()
if (
not self.simulate
@@ -2103,6 +2112,7 @@ class Importer(object):
v._no_post_save = True
try:
v._timestamp = self.timestamp
+ v._queue = "low_priority"
v.save()
except DatabaseError as e:
raise IntegrityError(e.message)
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
index bde457a3c..ccc70d81c 100644
--- a/ishtar_common/models_common.py
+++ b/ishtar_common/models_common.py
@@ -3340,6 +3340,7 @@ class MainItem(ShortMenuItem, SerializeItem):
return
if getattr(self, "_no_down_model_update", False):
return
+ queue = getattr(self, "_queue", settings.CELERY_DEFAULT_QUEUE)
for down_model in self.DOWN_MODEL_UPDATE:
if not settings.USE_BACKGROUND_TASK:
rel = getattr(self, down_model)
@@ -3349,6 +3350,7 @@ class MainItem(ShortMenuItem, SerializeItem):
for item in getattr(self, down_model).all():
if hasattr(self, "_timestamp"):
item._timestamp = self._timestamp
+ item._queue = queue
if hasattr(item, "cached_label_changed"):
item.cached_label_changed()
if hasattr(item, "main_geodata"):
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index d5ace858d..44d42f00f 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -124,6 +124,12 @@ if settings.USE_BACKGROUND_TASK:
pass
+@task()
+def test_task(arg):
+ print("Running task with priority: {}".format(arg))
+ time.sleep(3)
+
+
class BColors:
"""
Bash colors. Don't forget to finish your colored string with ENDC.
@@ -513,7 +519,9 @@ def revoke_old_task(kwargs, action_name, task_id, instance_cls):
cache.set(key, task_id, settings.CACHE_TIMEOUT * 4)
-def load_task(task_func, task_name, checks, sender, **kwargs):
+def load_task(task_func, task_name, checks, sender, queue=None, **kwargs):
+ if not queue:
+ queue = settings.CELERY_DEFAULT_QUEUE
instance = kwargs.get("instance", None)
if not instance:
return
@@ -539,7 +547,8 @@ def load_task(task_func, task_name, checks, sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- task_item = task_func.delay(sender, **kwargs)
+ kwargs["queue"] = queue
+ task_item = task_func.apply_async([sender], kwargs, queue=queue)
revoke_old_task(kwargs, task_name, task_item.id, instance.__class__)
return task_item
@@ -553,9 +562,11 @@ def cached_label_changed(sender, **kwargs):
if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and (
instance.timestamp_label or 0) >= (instance._timestamp or 0):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \
and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None):
- changed = load_task(_external_id_changed, "external_id_changed", None, sender, **kwargs)
+ changed = load_task(_external_id_changed, "external_id_changed", None, sender,
+ queue=queue, **kwargs)
if changed and (
not settings.USE_BACKGROUND_TASK
or not instance.pk
@@ -569,7 +580,7 @@ def cached_label_changed(sender, **kwargs):
if not force_update and getattr(instance, "_cached_label_checked", False):
return
return load_task(_cached_label_changed, "cached_label_changed", None, sender,
- **kwargs)
+ queue=queue, **kwargs)
@task()
@@ -590,6 +601,7 @@ def _cached_label_changed(sender, **kwargs):
return
instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp)
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}")
if hasattr(instance, "refresh_cache"):
instance.refresh_cache()
@@ -666,8 +678,9 @@ def external_id_changed(sender, **kwargs):
return
if getattr(instance, "_external_id_checked", None):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_external_id_changed, "external_id_changed",
- ["_external_id_changed"], sender, **kwargs)
+ ["_external_id_changed"], sender, queue=queue, **kwargs)
@task()
@@ -679,6 +692,7 @@ def _external_id_changed(sender, **kwargs):
return
if getattr(instance, "_external_id_checked", None):
return
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
updated = False
if not instance.external_id or instance.auto_external_id:
external_id = get_generated_id(instance.SLUG + "_external_id", instance)
@@ -839,8 +853,12 @@ def get_srid_obj_from_point(point):
def post_save_geodata(sender, **kwargs):
+ instance = kwargs["instance"]
+ if not instance:
+ return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender,
- **kwargs)
+ queue=queue, **kwargs)
@task()
@@ -855,6 +873,7 @@ def _post_save_geodata(sender, **kwargs):
if getattr(instance, "_post_saved_geo", False):
return
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
modified = False
if getattr(instance, "post_save_geo", False):
# TODO: geovectordata -> no post_save_geo: delete?
@@ -904,8 +923,9 @@ def post_save_geo(sender, **kwargs):
return
if getattr(instance, "_post_saved_geo", False):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"],
- sender, **kwargs)
+ sender, queue=queue, **kwargs)
@task()
@@ -928,6 +948,7 @@ def _post_save_geo(sender, **kwargs):
return
instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp)
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}")
instance._post_saved_geo = True
@@ -1627,6 +1648,7 @@ def m2m_historization_changed(sender, **kwargs):
obj = kwargs.get("instance", None)
if not obj:
return
+ obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
hist_values = obj.history_m2m or {}
for attr in obj.HISTORICAL_M2M:
values = []
diff --git a/ishtar_common/utils_celery.py b/ishtar_common/utils_celery.py
new file mode 100644
index 000000000..c2bf28ee9
--- /dev/null
+++ b/ishtar_common/utils_celery.py
@@ -0,0 +1,9 @@
+from kombu import Exchange, Queue
+
+
+def config_celery_queue(celery_conf):
+ task_queues = (
+ Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
+ Queue('celery', Exchange('celery'), routing_key='celery'),
+ )
+ celery_conf.task_queues = task_queues