Implementation of Django Celery asynchronous task queue

  • 2021-07-26 08:13:15
  • OfStack

Background

In development, we often encounter one time-consuming task, for example:

Upload and parse an Excel file with 1w data, and finally persist it to the database.

In my program, this task takes about 6s, and the 6s wait is already a disaster for users.

A better way to deal with it is:

Receive a request for this task Add this task to the queue Immediately return the words "Operation succeeded, processing in the background" The background consumes this queue and performs this task

According to this idea, we implemented it with the help of Celery.

Realization

The environment used in this article is as follows:

Python 3.6.7 RabbitMQ 3.8 Celery 4.3

Installing RabbitMQ with Docker

Celery relies on a message back-end, and the optional schemes are RabbitMQ, Redis, etc. RabbitMQ is selected in this paper.

At the same time, for the convenience of installation, I directly use Docker to install RabbitMQ:


docker run -d --name anno-rabbit -p 5672:5672 rabbitmq:3

After successful startup, the message queue can be accessed through amqp://localhost.

Install and configure Celery

Celery is a tool implemented by Python, and installation can be done directly through Pip:


pip install celery

At the same time, assume that the current My Project folder is proj, the project name is myproj, and the application name is myapp

After installation, create an celery. py file under the proj/myproj/ path to initialize the Celery instance:

proj/myproj/celery.py


from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')

app = Celery('myproj',
       broker='amqp://localhost//',
       backend='amqp://localhost//')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.s
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Then add a reference to the Celery object in proj/myproj/__init__. py to ensure that Celery can be initialized when Django is started:

proj/myproj/__init__.py


from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Without other special configurations, these are the basic configurations of Celery.

Write a time-consuming task

To simulate a time-consuming task, we directly create a method that "sleeps" 10s and set it to the task of Celery:

proj/myapp/tasks.py


import time
from myproj.celery import app as celery_app

@celery_app.task
def waste_time():
  time.sleep(10)
  return "Run function 'waste_time' finished."

Start Celery Worker

After the Celery configuration is complete and the task is created successfully, we start Celery in asynchronous task mode:


celery -A myproj worker -l info

Note that I emphasize asynchronous mode because Celery supports timed tasks as well as asynchronous tasks, so it should be specified at startup time.

At the same time, it should be noted that once Celery 1 is started, the modification to Task (here waste_time) will not take effect until Celery is restarted.

Task invocation

In the logic code for request processing, call the task created above:

proj/myapp/views.py


from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from .tasks import waste_time

@require_http_methods(["POST"])
def upload_files(request):
  waste_time.delay()
  # Status code 202: Accepted,  Indicates that the asynchronous task has been accepted and may still be in process 
  return JsonResponse({"results": " Operation successful, uploading in progress, please wait ..."}, status=202)

After the waste_time. delay () method is called, waste_time is added to the task queue, waiting for idle Celery Worker calls.

Effect

When we send the request, the interface will directly return the response content of {"results": "Operation succeeded, uploading, please wait..."} instead of sticking for 10 seconds, which makes the user experience much better.

Summarize

Using Celery to handle this asynchronous task is a common method of Python. Although the actual successful execution time remains unchanged or even increases (for example, Worker is busy and leads to processing lag), it is more acceptable for the user experience. After clicking Upload a large file, you can continue to handle other transactions without waiting on the page.
There are more uses of Celery that are not covered in this article, and the documentation is already very detailed, so you can refer to it directly if you need it.

Reference

http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

https://hub.docker.com/_/rabbitmq


Related articles: