Method for Django to dynamically add timed tasks using django celery beat

  • 2021-12-11 08:00:03
  • OfStack

Version information


#  Plug-in installation  
Django==2.2.2
django-celery-beat==2.1.0
django-redis==4.8.0
mysqlclient==2.0.0
django-mysql==3.2.0
redis==3.2.1
uWSGI==2.0.17.1
django-redis-cache==2.1.0

Installation and Configuration

Install the corresponding celery version above Configuring settings. py

# django Time zone configuration 
TIME_ZONE = 'Asia/Shanghai'
#  If USE_TZ Set to True When, Django The time zone set by the system default is used, and the TIME_ZONE It doesn't work whether it is set or not 
#  If USE_TZ  Set to False,TIME_ZONE = 'Asia/Shanghai',  Shanghai's is used UTC Time. 
USE_TZ = False

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

# celery beat Configure 
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django-celery-beat.schedulers.DatabaseScheduler'
# celery  Start work quantity setting for 
CELERY_WORKER_CONCURRENCY = 10
#  Task prefetch function, will take as many as possible  n  To ensure that the acquisition of communication costs can be compressed. 
CELERYD_PREFETCH_MULTIPLIER = 20
#  Deadlock can be prevented in some cases 
CELERYD_FORCE_EXECV = True
# celery  Adj.  worker  How many tasks are performed to restart 
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
#  Disable all speed limits. If network resources are limited, it is not recommended to run at full speed. 
CELERY_DISABLE_RATE_LIMITS = True
#  Set up an agent broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
#  Specify  Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

Generate database

python manage.py migrate


#  Table structure generated after migration 
django_celery_beat.models.PeriodicTask  This model defines a single periodic task to run. 
django_celery_beat.models.IntervalSchedule  At specific intervals (for example, every 5 Seconds) to run. 
django_celery_beat.models.CrontabSchedule  With the image in cron Schedule of the project area   Minutes, hours and days 1 Week  DAY_OF_MONTH month_of_year
django_celery_beat.models.PeriodicTasks  This model is used only as an index to track when the plan changes 

Configure celery. py in the working directory


# -*- coding: utf-8 -*-
# @File:    celeryc.py
# @Content: celery Timed task configuration 


import os
from celery import Celery, platforms
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_server.settings")
app = Celery("django_server")
app.config_from_object("django.conf:settings", namespace="CELERY")
#  Storage location of timed tasks 
app.autodiscover_tasks(["monitoring.tasks", "wechat.tasks"])

Create an tasks task


from celery import shared_task

@shared_task
def alarm_monitor_task(**kwargs):
  print(" Timed mission! ! ! ")

Create a timed task


from django_celery_beat.models import PeriodicTask, IntervalSchedule

----- Periodic task 
#   Create 10 An interval of minutes  interval  Object 
schedule, _ = IntervalSchedule.objects.update_or_create(every=10, period=IntervalSchedule.MINUTES)
#  Update if the task exists, and create if it does not exist 
PeriodicTask.objects.update_or_create(
  defaults={
    "interval": schedule,  #  Create above 10 An interval of minutes  interval  Object 
    "task": "monitoring.tasks.alarm_monitor_task",  #  Specify tasks that need to be executed periodically 
    "args"=json.dumps(['arg1', 'arg2']),
    "kwargs": json.dumps({"a": 1, "b": 2}, ensure_ascii=False)  #  Parameters passed in 
  },
  name=" Timing task -task",
)
#  Optional parameters for periodic tasks 
IntervalSchedule.DAYS  Fixed interval days 
IntervalSchedule.HOURS  Fixed interval hours 
IntervalSchedule.MINUTES  Fixed interval minutes 
IntervalSchedule.SECONDS  Fixed interval seconds 
IntervalSchedule.MICROSECONDS  Fixed interval microseconds 


----Crontab  Periodic task 
from django_celery_beat.models import CrontabSchedule, PeriodicTask
#  Creation interval 30 Tasks performed in minutes 
crontab, _ = CrontabSchedule.objects.update_or_create(
  minute="*/30",
  hour="*",
  day_of_week="*",
  day_of_month='*',
  month_of_year='*',
  timezone=pytz.timezone("Asia/Shanghai"),
)
#  Update if task exists, create if it does not exist 
PeriodicTask.objects.update_or_create(
  name=task_name,
  defaults={
    "kwargs": json.dumps(kwargs, ensure_ascii=False),
    "task": "wechat.tasks.subscribe_task",
    "crontab": crontab,
  },
)

#  Delete a task 
task = PeriodicTask.objects.filter(name__startswith=sub_id)
if task:
    task.update(enabled=False)
    task.delete()
    
    
#  Pause the current task 
tasks = PeriodicTask.objects.filter(name__startswith=sub_id)
if tasks:
    tasks.update(enabled=True if status else False)

Run a task


#  Start a task  work
celery -A django_server worker -l INFO --logfile=/var/log/dec_server/worker.log

#  Start timer trigger  beat
celery -A django_server beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/var/log/dec_server/beat.log

Tips:

Reference link:

https://github.com/celery/django-celery-beat

https://pypi.org/project/django-celery-beat/


Related articles: