summaryrefslogtreecommitdiff
path: root/ishtar_common
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common')
-rw-r--r--ishtar_common/data_importer.py10
-rw-r--r--ishtar_common/models_common.py2
-rw-r--r--ishtar_common/utils.py36
-rw-r--r--ishtar_common/utils_celery.py9
4 files changed, 50 insertions, 7 deletions
diff --git a/ishtar_common/data_importer.py b/ishtar_common/data_importer.py
index 7bbb0753c..6a58e0409 100644
--- a/ishtar_common/data_importer.py
+++ b/ishtar_common/data_importer.py
@@ -883,6 +883,7 @@ class Importer(object):
# force django based post-processing for the item
item = cls.objects.get(pk=pk)
item._timestamp = self.timestamp
+ item._queue = "low_priority"
item.save()
if hasattr(item, "RELATED_POST_PROCESS"):
for related_key in item.RELATED_POST_PROCESS:
@@ -897,6 +898,7 @@ class Importer(object):
try:
item = cls.objects.get(pk=pk)
item._timestamp = self.timestamp
+ item._queue = "low_priority"
item.save()
if hasattr(item, "fix"):
# post save/m2m specific fix
@@ -1105,6 +1107,7 @@ class Importer(object):
def _create_item(self, cls, dct, idx_line):
obj = cls(**dct)
obj._no_post_save = True # delayed at the end of the import
+ obj._queue = "low_priority"
obj.save()
self._add_to_post_save(cls, obj.pk, idx_line)
return obj
@@ -1219,6 +1222,7 @@ class Importer(object):
setattr(obj, k, data["defaults"][k])
obj._no_post_save = True
obj._timestamp = self.timestamp
+ obj._queue = "low_priority"
obj.save()
self._add_to_post_save(obj.__class__, obj.pk, idx_line)
@@ -1258,6 +1262,7 @@ class Importer(object):
for k in geodata:
setattr(item, k, geodata[k])
item._timestamp = self.timestamp
+ item._queue = "low_priority"
item.save()
else:
item = GeoVectorData.objects.create(**geodata)
@@ -1273,6 +1278,7 @@ class Importer(object):
obj._post_saved_geo = True
obj._no_move = True
obj.skip_history_when_saving = True
+ obj._queue = "low_priority"
obj.save()
n = datetime.datetime.now()
@@ -1345,6 +1351,7 @@ class Importer(object):
setattr(t_obj, k, data["defaults"][k])
t_obj._no_post_save = True
t_obj._timestamp = self.timestamp
+ t_obj._queue = "low_priority"
t_obj.save()
self._add_to_post_save(t_obj.__class__, t_obj.pk, idx_line)
if self.import_instance and hasattr(t_obj, "imports") and created:
@@ -1685,6 +1692,7 @@ class Importer(object):
setattr(v, k, extra_fields[k])
if changed:
v._timestamp = self.timestamp
+ v._queue = "low_priority"
v.save()
for att, objs in m2m_m2ms:
if type(objs) not in (list, tuple):
@@ -2034,6 +2042,7 @@ class Importer(object):
for k in updated_dct:
setattr(obj, k, updated_dct[k])
obj._timestamp = self.timestamp
+ obj._queue = "low_priority"
obj.save()
if (
not self.simulate
@@ -2103,6 +2112,7 @@ class Importer(object):
v._no_post_save = True
try:
v._timestamp = self.timestamp
+ v._queue = "low_priority"
v.save()
except DatabaseError as e:
raise IntegrityError(e.message)
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
index bde457a3c..ccc70d81c 100644
--- a/ishtar_common/models_common.py
+++ b/ishtar_common/models_common.py
@@ -3340,6 +3340,7 @@ class MainItem(ShortMenuItem, SerializeItem):
return
if getattr(self, "_no_down_model_update", False):
return
+ queue = getattr(self, "_queue", settings.CELERY_DEFAULT_QUEUE)
for down_model in self.DOWN_MODEL_UPDATE:
if not settings.USE_BACKGROUND_TASK:
rel = getattr(self, down_model)
@@ -3349,6 +3350,7 @@ class MainItem(ShortMenuItem, SerializeItem):
for item in getattr(self, down_model).all():
if hasattr(self, "_timestamp"):
item._timestamp = self._timestamp
+ item._queue = queue
if hasattr(item, "cached_label_changed"):
item.cached_label_changed()
if hasattr(item, "main_geodata"):
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 074dc9ef7..621e96b11 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -125,6 +125,12 @@ if settings.USE_BACKGROUND_TASK:
pass
+@task()
+def test_task(arg):
+ print("Running task with priority: {}".format(arg))
+ time.sleep(3)
+
+
class BColors:
"""
Bash colors. Don't forget to finish your colored string with ENDC.
@@ -514,7 +520,9 @@ def revoke_old_task(kwargs, action_name, task_id, instance_cls):
cache.set(key, task_id, settings.CACHE_TIMEOUT * 4)
-def load_task(task_func, task_name, checks, sender, **kwargs):
+def load_task(task_func, task_name, checks, sender, queue=None, **kwargs):
+ if not queue:
+ queue = settings.CELERY_DEFAULT_QUEUE
instance = kwargs.get("instance", None)
if not instance:
return
@@ -540,7 +548,8 @@ def load_task(task_func, task_name, checks, sender, **kwargs):
sender, kwargs = serialize_args_for_tasks(
sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
)
- task_item = task_func.delay(sender, **kwargs)
+ kwargs["queue"] = queue
+ task_item = task_func.apply_async([sender], kwargs, queue=queue)
revoke_old_task(kwargs, task_name, task_item.id, instance.__class__)
return task_item
@@ -554,9 +563,11 @@ def cached_label_changed(sender, **kwargs):
if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and (
instance.timestamp_label or 0) >= (instance._timestamp or 0):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \
and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None):
- changed = load_task(_external_id_changed, "external_id_changed", None, sender, **kwargs)
+ changed = load_task(_external_id_changed, "external_id_changed", None, sender,
+ queue=queue, **kwargs)
if changed and (
not settings.USE_BACKGROUND_TASK
or not instance.pk
@@ -570,7 +581,7 @@ def cached_label_changed(sender, **kwargs):
if not force_update and getattr(instance, "_cached_label_checked", False):
return
return load_task(_cached_label_changed, "cached_label_changed", None, sender,
- **kwargs)
+ queue=queue, **kwargs)
@task()
@@ -591,6 +602,7 @@ def _cached_label_changed(sender, **kwargs):
return
instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp)
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}")
if hasattr(instance, "refresh_cache"):
instance.refresh_cache()
@@ -667,8 +679,9 @@ def external_id_changed(sender, **kwargs):
return
if getattr(instance, "_external_id_checked", None):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_external_id_changed, "external_id_changed",
- ["_external_id_changed"], sender, **kwargs)
+ ["_external_id_changed"], sender, queue=queue, **kwargs)
@task()
@@ -680,6 +693,7 @@ def _external_id_changed(sender, **kwargs):
return
if getattr(instance, "_external_id_checked", None):
return
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
updated = False
if not instance.external_id or instance.auto_external_id:
external_id = get_generated_id(instance.SLUG + "_external_id", instance)
@@ -840,8 +854,12 @@ def get_srid_obj_from_point(point):
def post_save_geodata(sender, **kwargs):
+ instance = kwargs["instance"]
+ if not instance:
+ return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender,
- **kwargs)
+ queue=queue, **kwargs)
@task()
@@ -856,6 +874,7 @@ def _post_save_geodata(sender, **kwargs):
if getattr(instance, "_post_saved_geo", False):
return
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
modified = False
if getattr(instance, "post_save_geo", False):
# TODO: geovectordata -> no post_save_geo: delete?
@@ -905,8 +924,9 @@ def post_save_geo(sender, **kwargs):
return
if getattr(instance, "_post_saved_geo", False):
return
+ queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"],
- sender, **kwargs)
+ sender, queue=queue, **kwargs)
@task()
@@ -929,6 +949,7 @@ def _post_save_geo(sender, **kwargs):
return
instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp)
+ instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}")
instance._post_saved_geo = True
@@ -1628,6 +1649,7 @@ def m2m_historization_changed(sender, **kwargs):
obj = kwargs.get("instance", None)
if not obj:
return
+ obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
hist_values = obj.history_m2m or {}
for attr in obj.HISTORICAL_M2M:
values = []
diff --git a/ishtar_common/utils_celery.py b/ishtar_common/utils_celery.py
new file mode 100644
index 000000000..c2bf28ee9
--- /dev/null
+++ b/ishtar_common/utils_celery.py
@@ -0,0 +1,9 @@
+from kombu import Exchange, Queue
+
+
+def config_celery_queue(celery_conf):
+ task_queues = (
+ Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
+ Queue('celery', Exchange('celery'), routing_key='celery'),
+ )
+ celery_conf.task_queues = task_queues