Talk about avoiding repeated execution of Celery timed tasks through celery_one

  • 2021-12-11 18:21:46
  • OfStack

When using Celery to count the number of daily visits, it is found that one task will be executed twice at the same time, and it is found that two tasks were sent at the same time in the same time (within 1s), that is, two worker were generated at the same time, resulting in two statistics, and the reason could not be found directly.

Reference: https://www.ofstack.com/article/226849. htm

Some people use Redis to implement distributed locking, and then others use Celery Once.

Celery Once is also implemented by locking Redis, and Celery Once implements QueueOnce based on Task, which provides the function of task de-duplication. Therefore, when using it, our own method needs to set QueueOnce to base


@task(base=QueueOnce, once={'graceful': True})

The latter once parameter indicates the processing mode when encountering duplicate methods. The default graceful is False, so Celery will throw AlreadyQueued exception. If it is manually set to True, it will be processed silently.

In addition, if you want to manually set the key of the task, you can specify the keys parameter


@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

Solution steps

Celery One allows you to queue Celery tasks to prevent multiple executions

Installation

pip install -U celery_once

Requirements, Celery 4.0 is required, the old version may run, but it is not officially supported.

With celery_once, tasks inherits an abstract base tasks named QueueOnce

After Once is installed, you need to configure 1 option about ONCE in Celery configuration


from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')

# 1 The previous configuration does not have this, so it needs to be added on 
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

#  Inside the original no parameters, add base
@celery.task(base=QueueOnce)
def slow_task():
    sleep(30)
    return "Done!"

To determine the configuration, it depends on which backend is used for locking. Look at Backends

At the back end, this overrides apply_async and delay. It does not affect the direct invocation of tasks.

When the task is run, celery_once checks that there is no lock (for the Redis key). Otherwise, the task will run normally. 1 Once the task completes (or ends due to an exception), the lock will be cleared. If you try to run the task again before it completes, an AlreadyQueued exception will be thrown.

example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()

graceful: If once= {'graceful': True} is set in the options of the task, or apply_async is set at runtime, the task can return None instead of throwing an AlreadyQueued exception.


from celery_once import AlreadyQueued
# Either catch the exception,
try:
    example.delay(10)
except AlreadyQueued:
    pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
    sleep(30)
    return "Done!"

For other functions, please visit: https://pypi.org/project/celery_once/


Related articles: