APScheduler Usage of Python Timing Task Tool
- 2021-07-26 08:08:11
- OfStack
APScheduler (advanceded python scheduler) is a timed task tool developed by Python.
Document address apscheduler. readthedocs. io/en/latest/u …
Features:
The crontab system that does not depend on the Linux system runs regularly and independently You can dynamically add new timed tasks, which must be paid within 30 minutes after the following order; Otherwise, if you cancel the order, you can use this tool (add timed tasks for this order every time you place 1 order) Added timed tasks can be persisted1 Installation
pip install apscheduler
2 composition
APScheduler consists of the following four parts: The triggers trigger specifies when a timed task is executed The job stores memory can persist the timing The executors executor executes the task in a process or thread mode when the task is timed to be executed The common schedulers schedulers are BackgroundScheduler (running in the background) and BlockingScheduler (blocking)3 Usage
from apscheduler.schedulers.background import BlockingScheduler
#
To create a scheduler object for a timed task
scheduler = BlockingScheduler()
# Create an Actuator
executors = {
'default': ThreadPoolExecutor(20),
}
# Define Timing Tasks
def my_job(param1, param2): # Parameter passes through add_job()args Pass it on, pass it on
print(param1) # 100
print(param2) # python
# Add a timed task to the scheduler
scheduler.add_job(my_job, 'date', args=[100, 'python'], executors=executors)
# Start the timed task scheduler to work
scheduler.start()
4 Scheduler Scheduler
Responsible for managing timed tasks
BlockingScheduler: Used as a stand-alone process
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # The program will block here
BackgroundScheduler : In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # The program will not block here
AsyncIOScheduler: Use when your program uses asyncio. GeventScheduler: Used when your program uses gevent. TornadoScheduler: Use when your program is based on Tornado. TwistedScheduler: Used when your program uses Twisted QtScheduler: If your application is an Qt application, it can be used.
4 Actuator executors
Execute the task as a process or thread when the task is scheduled to execute
ThreadPoolExecutor
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers)
Usage
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'default': ThreadPoolExecutor(20) # Maximum 20 Threads execute at the same time
}
scheduler = BackgroundScheduler(executors=executors)
ProcessPoolExecutor
from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
Usage
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
'default': ProcessPoolExecutor(5) # Maximum 5 Processes execute at the same time
}
scheduler = BackgroundScheduler(executors=executors)
5 Trigger Trigger
Specify the timing of the timed task.
1) date executes on a specific time date
from datetime import date
# In 2019 Year 11 Month 6 Day 00:00:00 Execute
sched.add_job(my_job, 'date', run_date=date(2019, 11, 6))
# In 2019 Year 11 Month 6 Day 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05')
# Execute immediately
sched.add_job(my_job, 'date')
sched.start()
2) interval is executed after a specified time interval
weeks (int) In fact, in fact, the number of weeks to wait
days (int) In fact, in fact, the number of days to wait
hours (int) In fact, in fact, the number of hours to wait
minutes (int) In fact, in fact, the number of minutes to wait
seconds (int) In fact, in fact, the number of seconds to wait
start_date (datetime|str) In fact, in fact, the starting point for the interval calculation
end_date (datetime|str) In fact, in fact, the latest possible date/time to trigger on
timezone (datetime.tzinfo|str) In fact, in fact, the time zone to use for the date/time calculations
from datetime import datetime
# Perform every two hours 1 Times
sched.add_job(job_function, 'interval', hours=2)
# In 2012 Year 10 Month 10 Day 09:30:00 To 2014 Year 6 Month 15 Day 11:00:00 Execute every two hours within the time of 1 Times
sched.add_job(job_function, 'interval', hours=2, start_date='2012-10-10 09:30:00', end_date='2014-06-15 11:00:00')
3) cron executes at a specified period
year (int|str) In fact, in fact, the 4-digit year
month (int|str) In fact, in fact, the month (1-12)
day (int|str) In fact, in fact, the day of the (1-31)
week (int|str) In fact, in fact, the ISO week (1-53)
day_of_week (int|str) In fact, in fact, the number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) In fact, in fact, the hour (0-23)
minute (int|str) In fact, in fact, the minute (0-59)
second (int|str) In fact, in fact, the second (0-59)
start_date (datetime|str) In fact, in fact, the earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) In fact, in fact, the latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) In fact, in fact, the time zone to use for the date/time calculations (defaults to scheduler timezone)
# In 6 , 7 , 8 , 11 , 12 The first of the month 3 Week 5 Adj. 00:00, 01:00, 02:00 And 03:00 Execute
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# In 2014 Year 5 Month 30 The week before 1 To the week 5 Adj. 5:30 Execute
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
6. Task store
MemoryJobStore Default memory storage
MongoDBJobStore Tasks are saved to MongoDB
from apscheduler.jobstores.mongodb import MongoDB
JobStoreMongoDBJobStore() Copy code
RedisJobStore Tasks are saved to redis
from apscheduler.jobstores.redis import RedisJobStore
RedisJobStore()
7 Configuration method
Method 1
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'default': ThreadPoolExecutor(20),
}
conf = { # redis Configure
"host":127.0.0.1,
"port":6379,
"db":15, # Connect 15 Number database
"max_connections":10 # redis Maximum support 300 Number of connections
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # Add a persistent storage method for tasks , If not installed redis You can omit this step
Method 2
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # The program will block here
BackgroundScheduler : In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # The program will not block here
0
8 Startup
scheduler.start()
For BlockingScheduler, the program will block here, preventing it from exiting when used as a stand-alone process. (Can be used to generate static pages)
For BackgroundScheduler, it can be used in applications. Is no longer used as a separate process. (E. G. Cancel the order within 30 minutes)
9 Extension
Task management
Mode 1
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # The program will block here
BackgroundScheduler : In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # The program will not block here
1
Mode 2
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # The program will block here
BackgroundScheduler : In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # The program will not block here
2
Adjust the task scheduling cycle
job.modify(max_instances=6, name='Alternate name')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') Copy code
Stop APScheduler Run
scheduler.shutdown()
10 Comprehensive use
The idea of canceling order payment in 30 minutes is provided here, which can be realized by using Flask or Django program. Here, a timed task is dynamically added in django application, and the scheduler needs to use BackgroundScheduler. Let's first define the task to perform order cancellation.
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # The program will block here
BackgroundScheduler : In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # The program will not block here
4
Which tables should be operated according to the design of their own tables, which is roughly the above idea. Then, the task of canceling the order is generated at the same time in the view of generating the order. The order will then be cancelled
cancel_order_job()
The required parameters are passed in the past, and attention should be paid to determine the status of the current order as unpaid.
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # The program will block here
BackgroundScheduler : In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # The program will not block here
5
Note: If you need to execute a timed task periodically, if you use the model class in django or the configuration information of Flask and other related information, you need to import the configuration information of the framework.
Summarize