Use Celery Once to prevent Celery from performing the same task repeatedly

  • 2021-12-11 18:21:37
  • OfStack

When using Celery, I found that sometimes Celery will execute the same task twice. I encountered a situation where the same task was executed separately in different worker, and the time difference was only a few milliseconds. I thought there was something wrong with the logic I handled. Later, I found that other people had similar problems, and then basically all the problems were using Redis as Broker. However, I didn't want to replace Redis, so I could only add distributed locks when task was executed.

However, after searching 1 in issue of Celery, some people used Redis to implement distributed locking, and then others used Celery Once. I roughly looked at Celery Once for 1 time, and found that it was very consistent with the current situation, so I used it.

Celery Once is also implemented by locking Redis, and Celery Once implements QueueOnce based on Task, which provides the function of task de-duplication. Therefore, when using it, our own method needs to set QueueOnce to base

@task(base=QueueOnce, once={'graceful': True})

The latter once parameter indicates the processing mode when encountering duplicate methods. The default graceful is False, so Celery will throw AlreadyQueued exception, and if it is manually set to True, it will be processed silently.

In addition, if you want to manually set the key of the task, you can specify the keys parameter


@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

Generally speaking, it is divided into several steps

Step 1, Install

pip install -U celery_once

Step 2, Add Configuration


from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

Step 3: Modify the delay method


example.delay(10)
#  Modify to 
result = example.apply_async(args=(10))

Step 4: Modify task parameters


@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

Reference link https://github.com/cameronmaske/celery-once


Related articles: