diff options
Diffstat (limited to 'ishtar_common')
-rw-r--r-- | ishtar_common/data_importer.py | 10 | ||||
-rw-r--r-- | ishtar_common/models_common.py | 2 | ||||
-rw-r--r-- | ishtar_common/utils.py | 36 | ||||
-rw-r--r-- | ishtar_common/utils_celery.py | 9 |
4 files changed, 50 insertions, 7 deletions
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py index 7bbb0753c..6a58e0409 100644 --- a/ishtar_common/data_importer.py +++ b/ishtar_common/data_importer.py @@ -883,6 +883,7 @@ class Importer(object): # force django based post-processing for the item item = cls.objects.get(pk=pk) item._timestamp = self.timestamp + item._queue = "low_priority" item.save() if hasattr(item, "RELATED_POST_PROCESS"): for related_key in item.RELATED_POST_PROCESS: @@ -897,6 +898,7 @@ class Importer(object): try: item = cls.objects.get(pk=pk) item._timestamp = self.timestamp + item._queue = "low_priority" item.save() if hasattr(item, "fix"): # post save/m2m specific fix @@ -1105,6 +1107,7 @@ class Importer(object): def _create_item(self, cls, dct, idx_line): obj = cls(**dct) obj._no_post_save = True # delayed at the end of the import + obj._queue = "low_priority" obj.save() self._add_to_post_save(cls, obj.pk, idx_line) return obj @@ -1219,6 +1222,7 @@ class Importer(object): setattr(obj, k, data["defaults"][k]) obj._no_post_save = True obj._timestamp = self.timestamp + obj._queue = "low_priority" obj.save() self._add_to_post_save(obj.__class__, obj.pk, idx_line) @@ -1258,6 +1262,7 @@ class Importer(object): for k in geodata: setattr(item, k, geodata[k]) item._timestamp = self.timestamp + item._queue = "low_priority" item.save() else: item = GeoVectorData.objects.create(**geodata) @@ -1273,6 +1278,7 @@ class Importer(object): obj._post_saved_geo = True obj._no_move = True obj.skip_history_when_saving = True + obj._queue = "low_priority" obj.save() n = datetime.datetime.now() @@ -1345,6 +1351,7 @@ class Importer(object): setattr(t_obj, k, data["defaults"][k]) t_obj._no_post_save = True t_obj._timestamp = self.timestamp + t_obj._queue = "low_priority" t_obj.save() self._add_to_post_save(t_obj.__class__, t_obj.pk, idx_line) if self.import_instance and hasattr(t_obj, "imports") and created: @@ -1685,6 +1692,7 @@ class Importer(object): setattr(v, k, extra_fields[k]) if changed: v._timestamp = self.timestamp + v._queue = "low_priority" v.save() for att, objs in m2m_m2ms: if type(objs) not in (list, tuple): @@ -2034,6 +2042,7 @@ class Importer(object): for k in updated_dct: setattr(obj, k, updated_dct[k]) obj._timestamp = self.timestamp + obj._queue = "low_priority" obj.save() if ( not self.simulate @@ -2103,6 +2112,7 @@ class Importer(object): v._no_post_save = True try: v._timestamp = self.timestamp + v._queue = "low_priority" v.save() except DatabaseError as e: raise IntegrityError(e.message) diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index bde457a3c..ccc70d81c 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -3340,6 +3340,7 @@ class MainItem(ShortMenuItem, SerializeItem): return if getattr(self, "_no_down_model_update", False): return + queue = getattr(self, "_queue", settings.CELERY_DEFAULT_QUEUE) for down_model in self.DOWN_MODEL_UPDATE: if not settings.USE_BACKGROUND_TASK: rel = getattr(self, down_model) @@ -3349,6 +3350,7 @@ class MainItem(ShortMenuItem, SerializeItem): for item in getattr(self, down_model).all(): if hasattr(self, "_timestamp"): item._timestamp = self._timestamp + item._queue = queue if hasattr(item, "cached_label_changed"): item.cached_label_changed() if hasattr(item, "main_geodata"): diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index 074dc9ef7..621e96b11 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -125,6 +125,12 @@ if settings.USE_BACKGROUND_TASK: pass +@task() +def test_task(arg): + print("Running task with priority: {}".format(arg)) + time.sleep(3) + + class BColors: """ Bash colors. Don't forget to finish your colored string with ENDC. @@ -514,7 +520,9 @@ def revoke_old_task(kwargs, action_name, task_id, instance_cls): cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) -def load_task(task_func, task_name, checks, sender, **kwargs): +def load_task(task_func, task_name, checks, sender, queue=None, **kwargs): + if not queue: + queue = settings.CELERY_DEFAULT_QUEUE instance = kwargs.get("instance", None) if not instance: return @@ -540,7 +548,8 @@ def load_task(task_func, task_name, checks, sender, **kwargs): sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) - task_item = task_func.delay(sender, **kwargs) + kwargs["queue"] = queue + task_item = task_func.apply_async([sender], kwargs, queue=queue) revoke_old_task(kwargs, task_name, task_item.id, instance.__class__) return task_item @@ -554,9 +563,11 @@ def cached_label_changed(sender, **kwargs): if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and ( instance.timestamp_label or 0) >= (instance._timestamp or 0): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \ and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None): - changed = load_task(_external_id_changed, "external_id_changed", None, sender, **kwargs) + changed = load_task(_external_id_changed, "external_id_changed", None, sender, + queue=queue, **kwargs) if changed and ( not settings.USE_BACKGROUND_TASK or not instance.pk @@ -570,7 +581,7 @@ def cached_label_changed(sender, **kwargs): if not force_update and getattr(instance, "_cached_label_checked", False): return return load_task(_cached_label_changed, "cached_label_changed", None, sender, - **kwargs) + queue=queue, **kwargs) @task() @@ -591,6 +602,7 @@ def _cached_label_changed(sender, **kwargs): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp) + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}") if hasattr(instance, "refresh_cache"): instance.refresh_cache() @@ -667,8 +679,9 @@ def external_id_changed(sender, **kwargs): return if getattr(instance, "_external_id_checked", None): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_external_id_changed, "external_id_changed", - ["_external_id_changed"], sender, **kwargs) + ["_external_id_changed"], sender, queue=queue, **kwargs) @task() @@ -680,6 +693,7 @@ def _external_id_changed(sender, **kwargs): return if getattr(instance, "_external_id_checked", None): return + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) updated = False if not instance.external_id or instance.auto_external_id: external_id = get_generated_id(instance.SLUG + "_external_id", instance) @@ -840,8 +854,12 @@ def get_srid_obj_from_point(point): def post_save_geodata(sender, **kwargs): + instance = kwargs["instance"] + if not instance: + return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender, - **kwargs) + queue=queue, **kwargs) @task() @@ -856,6 +874,7 @@ def _post_save_geodata(sender, **kwargs): if getattr(instance, "_post_saved_geo", False): return + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) modified = False if getattr(instance, "post_save_geo", False): # TODO: geovectordata -> no post_save_geo: delete? @@ -905,8 +924,9 @@ def post_save_geo(sender, **kwargs): return if getattr(instance, "_post_saved_geo", False): return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"], - sender, **kwargs) + sender, queue=queue, **kwargs) @task() @@ -929,6 +949,7 @@ def _post_save_geo(sender, **kwargs): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp) + instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}") instance._post_saved_geo = True @@ -1628,6 +1649,7 @@ def m2m_historization_changed(sender, **kwargs): obj = kwargs.get("instance", None) if not obj: return + obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) hist_values = obj.history_m2m or {} for attr in obj.HISTORICAL_M2M: values = [] diff --git a/ishtar_common/utils_celery.py b/ishtar_common/utils_celery.py new file mode 100644 index 000000000..c2bf28ee9 --- /dev/null +++ b/ishtar_common/utils_celery.py @@ -0,0 +1,9 @@ +from kombu import Exchange, Queue + + +def config_celery_queue(celery_conf): + task_queues = ( + Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'), + Queue('celery', Exchange('celery'), routing_key='celery'), + ) + celery_conf.task_queues = task_queues |