Method for Django to dynamically add timed tasks using django celery beat
- 2021-12-11 08:00:03
- OfStack
Version information
# Plug-in installation
Django==2.2.2
django-celery-beat==2.1.0
django-redis==4.8.0
mysqlclient==2.0.0
django-mysql==3.2.0
redis==3.2.1
uWSGI==2.0.17.1
django-redis-cache==2.1.0
Installation and Configuration
Install the corresponding celery version above Configuring settings. py
# django Time zone configuration
TIME_ZONE = 'Asia/Shanghai'
# If USE_TZ Set to True When, Django The time zone set by the system default is used, and the TIME_ZONE It doesn't work whether it is set or not
# If USE_TZ Set to False,TIME_ZONE = 'Asia/Shanghai', Shanghai's is used UTC Time.
USE_TZ = False
INSTALLED_APPS = (
...,
'django_celery_beat',
)
# celery beat Configure
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django-celery-beat.schedulers.DatabaseScheduler'
# celery Start work quantity setting for
CELERY_WORKER_CONCURRENCY = 10
# Task prefetch function, will take as many as possible n To ensure that the acquisition of communication costs can be compressed.
CELERYD_PREFETCH_MULTIPLIER = 20
# Deadlock can be prevented in some cases
CELERYD_FORCE_EXECV = True
# celery Adj. worker How many tasks are performed to restart
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# Disable all speed limits. If network resources are limited, it is not recommended to run at full speed.
CELERY_DISABLE_RATE_LIMITS = True
# Set up an agent broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# Specify Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
Generate database
python manage.py migrate
# Table structure generated after migration
django_celery_beat.models.PeriodicTask This model defines a single periodic task to run.
django_celery_beat.models.IntervalSchedule At specific intervals (for example, every 5 Seconds) to run.
django_celery_beat.models.CrontabSchedule With the image in cron Schedule of the project area Minutes, hours and days 1 Week DAY_OF_MONTH month_of_year
django_celery_beat.models.PeriodicTasks This model is used only as an index to track when the plan changes
Configure celery. py in the working directory
# -*- coding: utf-8 -*-
# @File: celeryc.py
# @Content: celery Timed task configuration
import os
from celery import Celery, platforms
from celery.schedules import crontab
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_server.settings")
app = Celery("django_server")
app.config_from_object("django.conf:settings", namespace="CELERY")
# Storage location of timed tasks
app.autodiscover_tasks(["monitoring.tasks", "wechat.tasks"])
Create an tasks task
from celery import shared_task
@shared_task
def alarm_monitor_task(**kwargs):
print(" Timed mission! ! ! ")
Create a timed task
from django_celery_beat.models import PeriodicTask, IntervalSchedule
----- Periodic task
# Create 10 An interval of minutes interval Object
schedule, _ = IntervalSchedule.objects.update_or_create(every=10, period=IntervalSchedule.MINUTES)
# Update if the task exists, and create if it does not exist
PeriodicTask.objects.update_or_create(
defaults={
"interval": schedule, # Create above 10 An interval of minutes interval Object
"task": "monitoring.tasks.alarm_monitor_task", # Specify tasks that need to be executed periodically
"args"=json.dumps(['arg1', 'arg2']),
"kwargs": json.dumps({"a": 1, "b": 2}, ensure_ascii=False) # Parameters passed in
},
name=" Timing task -task",
)
# Optional parameters for periodic tasks
IntervalSchedule.DAYS Fixed interval days
IntervalSchedule.HOURS Fixed interval hours
IntervalSchedule.MINUTES Fixed interval minutes
IntervalSchedule.SECONDS Fixed interval seconds
IntervalSchedule.MICROSECONDS Fixed interval microseconds
----Crontab Periodic task
from django_celery_beat.models import CrontabSchedule, PeriodicTask
# Creation interval 30 Tasks performed in minutes
crontab, _ = CrontabSchedule.objects.update_or_create(
minute="*/30",
hour="*",
day_of_week="*",
day_of_month='*',
month_of_year='*',
timezone=pytz.timezone("Asia/Shanghai"),
)
# Update if task exists, create if it does not exist
PeriodicTask.objects.update_or_create(
name=task_name,
defaults={
"kwargs": json.dumps(kwargs, ensure_ascii=False),
"task": "wechat.tasks.subscribe_task",
"crontab": crontab,
},
)
# Delete a task
task = PeriodicTask.objects.filter(name__startswith=sub_id)
if task:
task.update(enabled=False)
task.delete()
# Pause the current task
tasks = PeriodicTask.objects.filter(name__startswith=sub_id)
if tasks:
tasks.update(enabled=True if status else False)
Run a task
# Start a task work
celery -A django_server worker -l INFO --logfile=/var/log/dec_server/worker.log
# Start timer trigger beat
celery -A django_server beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/var/log/dec_server/beat.log
Tips:
Reference link:
https://github.com/celery/django-celery-beat
https://pypi.org/project/django-celery-beat/