summaryrefslogtreecommitdiff
path: root/ishtar_common/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r--ishtar_common/utils.py77
1 files changed, 64 insertions, 13 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index cf58216a4..bcba9c52a 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -1123,14 +1123,51 @@ def load_task(task_func, task_name, checks, sender, queue=None, **kwargs):
return task_item
+def load_query_task(task_func, task_name, sender, queue=None, **kwargs):
+ if not queue:
+ queue = settings.CELERY_DEFAULT_QUEUE
+ query = kwargs.get("query", None)
+ if not query:
+ return
+
+ if not settings.USE_BACKGROUND_TASK:
+ # no background task
+ return task_func(sender, **kwargs)
+
+ sender, kwargs = serialize_args_for_tasks(
+ sender, query, kwargs, EXTRA_KWARGS_TRIGGER
+ )
+ kwargs["queue"] = queue
+ task_item = task_func.apply_async([sender], kwargs, queue=queue)
+ revoke_old_task(kwargs, task_name, task_item.id, sender)
+ return task_item
+
+
+def bulk_item_changed(sender, **kwargs):
+ load_query_task(_bulk_item_changed, "bulk_item_changed", sender, "low_priority")
+
+
+@task()
+def _bulk_item_changed(sender, **kwargs):
+ sender, query = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER)
+ if not query:
+ return
+ for item in sender.objects.filter(**query).all():
+ kwargs["instance"] = item
+ cached_label_changed(sender, **kwargs)
+ post_save_geo(sender, **kwargs)
+
+
def cached_label_changed(sender, **kwargs):
if "instance" not in kwargs:
return
instance = kwargs["instance"]
if not instance:
return
- if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and (
- instance.timestamp_label or 0) >= (instance._timestamp or 0):
+ timestamp = kwargs.get("timestamp", None) or getattr(instance, "_timestamp", None)
+ if not timestamp:
+ timestamp = int(datetime.datetime.now().timestamp())
+ if hasattr(instance, "timestamp_label") and (instance.timestamp_label or 0) >= timestamp:
return
queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \
@@ -1160,10 +1197,15 @@ def _cached_label_changed(sender, **kwargs):
if not force_update and getattr(instance, "_cached_label_checked", False):
return
- if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label"):
- if (instance.timestamp_label or 0) >= (instance._timestamp or 0):
+ timestamp = kwargs.get("timestamp", None)
+ if not timestamp:
+ timestamp = int(datetime.datetime.now().timestamp())
+ if hasattr(instance, "timestamp_label"):
+ if (instance.timestamp_label or 0) >= timestamp:
return
- instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp)
+ instance.__class__.objects.filter(pk=instance.pk).update(
+ timestamp_label=timestamp
+ )
instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}")
@@ -1197,7 +1239,7 @@ def _cached_label_changed(sender, **kwargs):
instance.__class__.objects.filter(pk=instance.pk).update(**dict(changed))
if ((getattr(instance, "check_cascade_update", False) and instance.check_cascade_update())
or changed or not cached_labels) and hasattr(instance, "cascade_update"):
- instance.cascade_update()
+ instance.cascade_update(timestamp=timestamp)
updated = False
if force_update or hasattr(instance, "update_search_vector"):
updated = instance.update_search_vector()
@@ -1206,9 +1248,7 @@ def _cached_label_changed(sender, **kwargs):
item._cascade_change = True
if hasattr(instance, "test_obj"):
item.test_obj = instance.test_obj
- if instance.timestamp_label:
- item._timestamp = instance.timestamp_label
- cached_label_changed(item.__class__, instance=item)
+ cached_label_changed(item.__class__, instance=item, timestamp=timestamp)
cache_key, __ = get_cache(sender, ["cached_label_changed", instance.pk])
cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT)
if cached_labels:
@@ -1495,6 +1535,12 @@ def post_save_geo(sender, **kwargs):
return
if getattr(instance, "_post_saved_geo", False):
return
+ timestamp = kwargs.get("timestamp", None) or getattr(instance, "_timestamp", None)
+ if not timestamp:
+ timestamp = int(datetime.datetime.now().timestamp())
+ elif (instance.timestamp_geo or 0) >= timestamp:
+ return
+
queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE)
return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"],
sender, queue=queue, **kwargs)
@@ -1515,10 +1561,15 @@ def _post_save_geo(sender, **kwargs):
if getattr(instance, "_post_saved_geo", False):
return
- if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_geo"):
- if (instance.timestamp_label or 0) >= (instance._timestamp or 0):
+ timestamp = kwargs.get("timestamp", None)
+ if hasattr(instance, "timestamp_geo"):
+ if timestamp and (instance.timestamp_geo or 0) >= timestamp:
return
- instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp)
+ if not timestamp:
+ timestamp = int(datetime.datetime.now().timestamp())
+ instance.__class__.objects.filter(pk=instance.pk).update(
+ timestamp_geo=timestamp
+ )
instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE)
logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}")
@@ -1541,7 +1592,7 @@ def _post_save_geo(sender, **kwargs):
instance._cached_label_checked = False
instance.save()
if hasattr(instance, "cascade_update"):
- instance.cascade_update()
+ instance.cascade_update(timestamp=timestamp)
cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk])
cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT)
return