APScheduler Usage of Python Timing Task Tool

  • 2021-07-26 08:08:11
  • OfStack

APScheduler (advanceded python scheduler) is a timed task tool developed by Python.

Document address apscheduler. readthedocs. io/en/latest/u …

Features:

The crontab system that does not depend on the Linux system runs regularly and independently You can dynamically add new timed tasks, which must be paid within 30 minutes after the following order; Otherwise, if you cancel the order, you can use this tool (add timed tasks for this order every time you place 1 order) Added timed tasks can be persisted

1 Installation

pip install apscheduler

2 composition

APScheduler consists of the following four parts: The triggers trigger specifies when a timed task is executed The job stores memory can persist the timing The executors executor executes the task in a process or thread mode when the task is timed to be executed The common schedulers schedulers are BackgroundScheduler (running in the background) and BlockingScheduler (blocking)

3 Usage


from apscheduler.schedulers.background import BlockingScheduler
#
  To create a scheduler object for a timed task 
scheduler = BlockingScheduler()
#  Create an Actuator 
executors = {
 'default': ThreadPoolExecutor(20),
}
#  Define Timing Tasks 
def my_job(param1, param2): #  Parameter passes through add_job()args Pass it on, pass it on 
 print(param1) # 100
 print(param2) # python
#  Add a timed task to the scheduler 
scheduler.add_job(my_job, 'date', args=[100, 'python'], executors=executors)
#  Start the timed task scheduler to work 
scheduler.start()


4 Scheduler Scheduler

Responsible for managing timed tasks

BlockingScheduler: Used as a stand-alone process


from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() #  The program will block here 
BackgroundScheduler :  In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() #  The program will not block here 

AsyncIOScheduler: Use when your program uses asyncio. GeventScheduler: Used when your program uses gevent. TornadoScheduler: Use when your program is based on Tornado. TwistedScheduler: Used when your program uses Twisted QtScheduler: If your application is an Qt application, it can be used.

4 Actuator executors

Execute the task as a process or thread when the task is scheduled to execute


ThreadPoolExecutor
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers) 

Usage


from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
  'default': ThreadPoolExecutor(20) #  Maximum 20 Threads execute at the same time 
 }
 scheduler = BackgroundScheduler(executors=executors)
ProcessPoolExecutor

from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)

Usage


from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
  'default': ProcessPoolExecutor(5) #  Maximum 5 Processes execute at the same time 
 }
scheduler = BackgroundScheduler(executors=executors)

5 Trigger Trigger

Specify the timing of the timed task.

1) date executes on a specific time date


from datetime import date
#  In 2019 Year 11 Month 6 Day 00:00:00 Execute 
sched.add_job(my_job, 'date', run_date=date(2019, 11, 6))
#  In 2019 Year 11 Month 6 Day 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05')
#  Execute immediately 
sched.add_job(my_job, 'date') 
sched.start()

2) interval is executed after a specified time interval


weeks (int)  In fact, in fact, the  number of weeks to wait
days (int)  In fact, in fact, the  number of days to wait
hours (int)  In fact, in fact, the  number of hours to wait
minutes (int)  In fact, in fact, the  number of minutes to wait
seconds (int)  In fact, in fact, the  number of seconds to wait
start_date (datetime|str)  In fact, in fact, the  starting point for the interval calculation
end_date (datetime|str)  In fact, in fact, the  latest possible date/time to trigger on
timezone (datetime.tzinfo|str)  In fact, in fact, the  time zone to use for the date/time calculations
from datetime import datetime
#  Perform every two hours 1 Times 
sched.add_job(job_function, 'interval', hours=2)
#  In 2012 Year 10 Month 10 Day 09:30:00  To 2014 Year 6 Month 15 Day 11:00:00 Execute every two hours within the time of 1 Times 
sched.add_job(job_function, 'interval', hours=2, start_date='2012-10-10 09:30:00', end_date='2014-06-15 11:00:00')

3) cron executes at a specified period


year (int|str)  In fact, in fact, the  4-digit year

month (int|str)  In fact, in fact, the  month (1-12)

day (int|str)  In fact, in fact, the  day of the (1-31)

week (int|str)  In fact, in fact, the  ISO week (1-53)

day_of_week (int|str)  In fact, in fact, the  number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)

hour (int|str)  In fact, in fact, the  hour (0-23)

minute (int|str)  In fact, in fact, the  minute (0-59)

second (int|str)  In fact, in fact, the  second (0-59)

start_date (datetime|str)  In fact, in fact, the  earliest possible date/time to trigger on (inclusive)

end_date (datetime|str)  In fact, in fact, the  latest possible date/time to trigger on (inclusive)

timezone (datetime.tzinfo|str)  In fact, in fact, the  time zone to use for the date/time calculations (defaults to scheduler timezone)

#  In 6 , 7 , 8 , 11 , 12 The first of the month 3 Week 5 Adj. 00:00, 01:00, 02:00 And 03:00  Execute 
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

#  In 2014 Year 5 Month 30 The week before 1 To the week 5 Adj. 5:30 Execute 
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')


6. Task store


MemoryJobStore  Default memory storage 
MongoDBJobStore  Tasks are saved to MongoDB
from apscheduler.jobstores.mongodb import MongoDB
JobStoreMongoDBJobStore() Copy code 
RedisJobStore  Tasks are saved to redis
from apscheduler.jobstores.redis import RedisJobStore
RedisJobStore()

7 Configuration method

Method 1


from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
 'default': ThreadPoolExecutor(20),
}
conf = { # redis Configure 
 "host":127.0.0.1,
 "port":6379,
 "db":15, #  Connect 15 Number database 
 "max_connections":10 # redis Maximum support 300 Number of connections 
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) #  Add a persistent storage method for tasks , If not installed redis You can omit this step 

Method 2


from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() #  The program will block here 
BackgroundScheduler :  In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() #  The program will not block here 
0

8 Startup

scheduler.start()

For BlockingScheduler, the program will block here, preventing it from exiting when used as a stand-alone process. (Can be used to generate static pages)

For BackgroundScheduler, it can be used in applications. Is no longer used as a separate process. (E. G. Cancel the order within 30 minutes)

9 Extension

Task management

Mode 1


from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() #  The program will block here 
BackgroundScheduler :  In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() #  The program will not block here 
1

Mode 2


from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() #  The program will block here 
BackgroundScheduler :  In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() #  The program will not block here 
2

Adjust the task scheduling cycle


job.modify(max_instances=6, name='Alternate name')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') Copy code 
 Stop APScheduler Run 
scheduler.shutdown()

10 Comprehensive use

The idea of canceling order payment in 30 minutes is provided here, which can be realized by using Flask or Django program. Here, a timed task is dynamically added in django application, and the scheduler needs to use BackgroundScheduler. Let's first define the task to perform order cancellation.


from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() #  The program will block here 
BackgroundScheduler :  In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() #  The program will not block here 
4


Which tables should be operated according to the design of their own tables, which is roughly the above idea. Then, the task of canceling the order is generated at the same time in the view of generating the order. The order will then be cancelled cancel_order_job() The required parameters are passed in the past, and attention should be paid to determine the status of the current order as unpaid.


from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() #  The program will block here 
BackgroundScheduler :  In framework programs such as Django , Flask ) .
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() #  The program will not block here 
5


Note: If you need to execute a timed task periodically, if you use the model class in django or the configuration information of Flask and other related information, you need to import the configuration information of the framework.

Summarize


Related articles: