diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2023-10-13 17:05:11 +0200 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-02-05 10:49:37 +0100 |
commit | fb9ca2c2704768dcd8b1b2d27741a2c043834f56 (patch) | |
tree | 5a468a927f57339a4b3467964b2cfb3ec88dd320 | |
parent | f6ff3b698c548098023135b97e62ec69a23bdd3c (diff) | |
download | Ishtar-fb9ca2c2704768dcd8b1b2d27741a2c043834f56.tar.bz2 Ishtar-fb9ca2c2704768dcd8b1b2d27741a2c043834f56.zip |
✨ background task: set a low priority queue for imports
-rw-r--r-- | Makefile.example | 4 | ||||
-rw-r--r-- | example_project/celery_app.py.sample | 4 | ||||
-rw-r--r-- | example_project/settings.py | 2 | ||||
-rw-r--r-- | ishtar_common/data_importer.py | 10 | ||||
-rw-r--r-- | ishtar_common/models_common.py | 2 | ||||
-rw-r--r-- | ishtar_common/utils.py | 36 | ||||
-rw-r--r-- | ishtar_common/utils_celery.py | 9 |
7 files changed, 57 insertions, 10 deletions
diff --git a/Makefile.example b/Makefile.example index 7773481e6..c23b333d5 100644 --- a/Makefile.example +++ b/Makefile.example @@ -183,7 +183,9 @@ runalt: ## run test server on port 9000 cd $(project); $(PYTHON) manage.py runserver 0.0.0.0:9000 runcelery: ## run a celery worker - celery -A example_project worker -l INFO # -l DEBUG -f debug-celery.log + celery -A example_project worker -Q celery -l INFO & + celery -A example_project worker -Q low_priority -l INFO + # -l DEBUG -f debug-celery.log blackd: ## blackd service $(VENV)bin/blackd diff --git a/example_project/celery_app.py.sample b/example_project/celery_app.py.sample index 81000bc14..ef8061cbf 100644 --- a/example_project/celery_app.py.sample +++ b/example_project/celery_app.py.sample @@ -1,6 +1,6 @@ import os from celery import Celery - +from ishtar_common.utils_celery import config_celery_queue # change example_project with your project name project_name = "example_project" @@ -9,6 +9,6 @@ project_name = "example_project" os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_name + '.settings') app = Celery(project_name) - +config_celery_queue(app.conf) app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() diff --git a/example_project/settings.py b/example_project/settings.py index 3c0accc82..8e9077aad 100644 --- a/example_project/settings.py +++ b/example_project/settings.py @@ -410,6 +410,8 @@ if LOG_LEVEL: if logger.startswith("archaeological") or logger.startswith("ishtar"): LOGGING["loggers"][logger]["level"] = LOG_LEVEL +CELERY_DEFAULT_QUEUE = "celery" + if USE_BACKGROUND_TASK: if not CELERY_BROKER_URL: CELERY_BROKER_URL = "amqp://localhost" diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py index 7bbb0753c..6a58e0409 100644 --- a/ishtar_common/data_importer.py +++ b/ishtar_common/data_importer.py @@ -883,6 +883,7 @@ class Importer(object): # force django based post-processing for the item item = cls.objects.get(pk=pk) item._timestamp = self.timestamp + item._queue = "low_priority" item.save() if hasattr(item, "RELATED_POST_PROCESS"): for related_key in item.RELATED_POST_PROCESS: @@ -897,6 +898,7 @@ class Importer(object): try: item = cls.objects.get(pk=pk) item._timestamp = self.timestamp + item._queue = "low_priority" item.save() if hasattr(item, "fix"): # post save/m2m specific fix @@ -1105,6 +1107,7 @@ class Importer(object): def _create_item(self, cls, dct, idx_line): obj = cls(**dct) obj._no_post_save = True # delayed at the end of the import + obj._queue = "low_priority" obj.save() self._add_to_post_save(cls, obj.pk, idx_line) return obj @@ -1219,6 +1222,7 @@ class Importer(object): setattr(obj, k, data["defaults"][k]) obj._no_post_save = True obj._timestamp = self.timestamp + obj._queue = "low_priority" obj.save() self._add_to_post_save(obj.__class__, obj.pk, idx_line) @@ -1258,6 +1262,7 @@ class Importer(object): for k in geodata: setattr(item, k, geodata[k]) item._timestamp = self.timestamp + item._queue = "low_priority" item.save() else: item = GeoVectorData.objects.create(**geodata) @@ -1273,6 +1278,7 @@ class Importer(object): obj._post_saved_geo = True obj._no_move = True obj.skip_history_when_saving = True + obj._queue = "low_priority" obj.save() n = datetime.datetime.now() @@ -1345,6 +1351,7 @@ class Importer(object): setattr(t_obj, k, data["defaults"][k]) t_obj._no_post_save = True t_obj._timestamp = self.timestamp + t_obj._queue = "low_priority" t_obj.save() self._add_to_post_save(t_obj.__class__, t_obj.pk, idx_line) if self.import_instance and hasattr(t_obj, "imports") and created: @@ -1685,6 +1692,7 @@ class Importer(object): setattr(v, k, extra_fields[k]) if changed: v._timestamp = self.timestamp + v._queue = "low_priority" v.save() for att, objs in m2m_m2ms: if type(objs) not in (list, tuple): @@ -2034,6 +2042,7 @@ class Importer(object): for k in updated_dct: setattr(obj, k, updated_dct[k]) obj._timestamp = self.timestamp + obj._queue = "low_priority" obj.save() if ( not self.simulate @@ -2103,6 +2112,7 @@ class Importer(object): v._no_post_save = True try: v._timestamp = self.timestamp + v._queue = "low_priority" v.save() except DatabaseError as e: raise IntegrityError(e.message) diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index bde457a3c..ccc70d81c 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -3340,6 +3340,7 @@ class MainItem(ShortMenuItem, SerializeItem): return if getattr(self, "_no_down_model_update", False): return + queue = getattr(self, "_queue", settings.CELERY_DEFAULT_QUEUE) for down_model in self.DOWN_MODEL_UPDATE: if not settings.USE_BACKGROUND_TASK: rel = getattr(self, down_model) @@ -3349,6 +3350,7 @@ class MainItem(ShortMenuItem, SerializeItem): for item in getattr(self, down_model).all(): if hasattr(self, "_timestamp"): item._timestamp = self._timestamp + item._queue = queue if hasattr(item, "cached_label_changed"): item.cached_label_changed() if hasattr(item, "main_geodata"): diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index d5ace858d..44d42f00f 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -124,6 +124,12 @@ if settings.USE_BACKGROUND_TASK: pass +@task() +def test_task(arg): + print("Running task with priority: {}".format(arg)) + time.sleep(3) + + class BColors: """ Bash colors. Don't forget to finish your colored string with ENDC. @@ -513,7 +519,9 @@ def revoke_old_task(kwargs, action_name, task_id, instance_cls): cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) -def load_task(task_func, task_name, checks, sender, **kwargs): +def load_task(task_func, task_name, checks, sender, queue=None, **kwargs): + if not queue: + queue = settings.CELERY_DEFAULT_QUEUE instance = kwargs.get("instance", None) if not instance: return @@ -539,7 +547,8 @@ def load_task(task_func, task_name, checks, sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - task_item = task_func.delay(sender, **kwargs) + kwargs["queue"] = queue + task_item = task_func.apply_async([sender], kwargs, queue=queue) revoke_old_task(kwargs, task_name, task_item.id, instance.__class__) return task_item @@ -553,9 +562,11 @@ def cached_label_changed(sender, **kwargs): if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and ( instance.timestamp_label or 0) >= (instance._timestamp or 0): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \ and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None): - changed = load_task(_external_id_changed, "external_id_changed", None, sender, **kwargs) + changed = load_task(_external_id_changed, "external_id_changed", None, sender, + queue=queue, **kwargs) if changed and ( not settings.USE_BACKGROUND_TASK or not instance.pk @@ -569,7 +580,7 @@ def cached_label_changed(sender, **kwargs): if not force_update and getattr(instance, "_cached_label_checked", False): return return load_task(_cached_label_changed, "cached_label_changed", None, sender, - **kwargs) + queue=queue, **kwargs) @task() @@ -590,6 +601,7 @@ def _cached_label_changed(sender, **kwargs): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp) + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}") if hasattr(instance, "refresh_cache"): instance.refresh_cache() @@ -666,8 +678,9 @@ def external_id_changed(sender, **kwargs): return if getattr(instance, "_external_id_checked", None): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_external_id_changed, "external_id_changed", - ["_external_id_changed"], sender, **kwargs) + ["_external_id_changed"], sender, queue=queue, **kwargs) @task() @@ -679,6 +692,7 @@ def _external_id_changed(sender, **kwargs): return if getattr(instance, "_external_id_checked", None): return + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) updated = False if not instance.external_id or instance.auto_external_id: external_id = get_generated_id(instance.SLUG + "_external_id", instance) @@ -839,8 +853,12 @@ def get_srid_obj_from_point(point): def post_save_geodata(sender, **kwargs): + instance = kwargs["instance"] + if not instance: + return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender, - **kwargs) + queue=queue, **kwargs) @task() @@ -855,6 +873,7 @@ def _post_save_geodata(sender, **kwargs): if getattr(instance, "_post_saved_geo", False): return + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) modified = False if getattr(instance, "post_save_geo", False): # TODO: geovectordata -> no post_save_geo: delete? @@ -904,8 +923,9 @@ def post_save_geo(sender, **kwargs): return if getattr(instance, "_post_saved_geo", False): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"], - sender, **kwargs) + sender, queue=queue, **kwargs) @task() @@ -928,6 +948,7 @@ def _post_save_geo(sender, **kwargs): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp) + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}") instance._post_saved_geo = True @@ -1627,6 +1648,7 @@ def m2m_historization_changed(sender, **kwargs): obj = kwargs.get("instance", None) if not obj: return + obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) hist_values = obj.history_m2m or {} for attr in obj.HISTORICAL_M2M: values = [] diff --git a/ishtar_common/utils_celery.py b/ishtar_common/utils_celery.py new file mode 100644 index 000000000..c2bf28ee9 --- /dev/null +++ b/ishtar_common/utils_celery.py @@ -0,0 +1,9 @@ +from kombu import Exchange, Queue + + +def config_celery_queue(celery_conf): + task_queues = ( + Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'), + Queue('celery', Exchange('celery'), routing_key='celery'), + ) + celery_conf.task_queues = task_queues |