diff options
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r-- | ishtar_common/utils.py | 77 |
1 files changed, 64 insertions, 13 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index cf58216a4..bcba9c52a 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -1123,14 +1123,51 @@ def load_task(task_func, task_name, checks, sender, queue=None, **kwargs): return task_item +def load_query_task(task_func, task_name, sender, queue=None, **kwargs): + if not queue: + queue = settings.CELERY_DEFAULT_QUEUE + query = kwargs.get("query", None) + if not query: + return + + if not settings.USE_BACKGROUND_TASK: + # no background task + return task_func(sender, **kwargs) + + sender, kwargs = serialize_args_for_tasks( + sender, query, kwargs, EXTRA_KWARGS_TRIGGER + ) + kwargs["queue"] = queue + task_item = task_func.apply_async([sender], kwargs, queue=queue) + revoke_old_task(kwargs, task_name, task_item.id, sender) + return task_item + + +def bulk_item_changed(sender, **kwargs): + load_query_task(_bulk_item_changed, "bulk_item_changed", sender, "low_priority") + + +@task() +def _bulk_item_changed(sender, **kwargs): + sender, query = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) + if not query: + return + for item in sender.objects.filter(**query).all(): + kwargs["instance"] = item + cached_label_changed(sender, **kwargs) + post_save_geo(sender, **kwargs) + + def cached_label_changed(sender, **kwargs): if "instance" not in kwargs: return instance = kwargs["instance"] if not instance: return - if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and ( - instance.timestamp_label or 0) >= (instance._timestamp or 0): + timestamp = kwargs.get("timestamp", None) or getattr(instance, "_timestamp", None) + if not timestamp: + timestamp = int(datetime.datetime.now().timestamp()) + if hasattr(instance, "timestamp_label") and (instance.timestamp_label or 0) >= timestamp: return queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \ @@ -1160,10 +1197,15 @@ def _cached_label_changed(sender, **kwargs): if not force_update and getattr(instance, "_cached_label_checked", False): return - if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label"): - if (instance.timestamp_label or 0) >= (instance._timestamp or 0): + timestamp = kwargs.get("timestamp", None) + if not timestamp: + timestamp = int(datetime.datetime.now().timestamp()) + if hasattr(instance, "timestamp_label"): + if (instance.timestamp_label or 0) >= timestamp: return - instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp) + instance.__class__.objects.filter(pk=instance.pk).update( + timestamp_label=timestamp + ) instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}") @@ -1197,7 +1239,7 @@ def _cached_label_changed(sender, **kwargs): instance.__class__.objects.filter(pk=instance.pk).update(**dict(changed)) if ((getattr(instance, "check_cascade_update", False) and instance.check_cascade_update()) or changed or not cached_labels) and hasattr(instance, "cascade_update"): - instance.cascade_update() + instance.cascade_update(timestamp=timestamp) updated = False if force_update or hasattr(instance, "update_search_vector"): updated = instance.update_search_vector() @@ -1206,9 +1248,7 @@ def _cached_label_changed(sender, **kwargs): item._cascade_change = True if hasattr(instance, "test_obj"): item.test_obj = instance.test_obj - if instance.timestamp_label: - item._timestamp = instance.timestamp_label - cached_label_changed(item.__class__, instance=item) + cached_label_changed(item.__class__, instance=item, timestamp=timestamp) cache_key, __ = get_cache(sender, ["cached_label_changed", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) if cached_labels: @@ -1495,6 +1535,12 @@ def post_save_geo(sender, **kwargs): return if getattr(instance, "_post_saved_geo", False): return + timestamp = kwargs.get("timestamp", None) or getattr(instance, "_timestamp", None) + if not timestamp: + timestamp = int(datetime.datetime.now().timestamp()) + elif (instance.timestamp_geo or 0) >= timestamp: + return + queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"], sender, queue=queue, **kwargs) @@ -1515,10 +1561,15 @@ def _post_save_geo(sender, **kwargs): if getattr(instance, "_post_saved_geo", False): return - if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_geo"): - if (instance.timestamp_label or 0) >= (instance._timestamp or 0): + timestamp = kwargs.get("timestamp", None) + if hasattr(instance, "timestamp_geo"): + if timestamp and (instance.timestamp_geo or 0) >= timestamp: return - instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp) + if not timestamp: + timestamp = int(datetime.datetime.now().timestamp()) + instance.__class__.objects.filter(pk=instance.pk).update( + timestamp_geo=timestamp + ) instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}") @@ -1541,7 +1592,7 @@ def _post_save_geo(sender, **kwargs): instance._cached_label_checked = False instance.save() if hasattr(instance, "cascade_update"): - instance.cascade_update() + instance.cascade_update(timestamp=timestamp) cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) return |