Python Timing Task APScheduler Example Detailed Explanation

  • 2021-07-24 11:15:28
  • OfStack

APScheduler supports three scheduling tasks: fixed time interval, fixed time point (date), and Crontab command under Linux. At the same time, it also supports asynchronous execution and background execution of scheduling tasks.

1. Basic architecture

Trigger triggers: Set the condition for triggering the task Describes when a task is triggered, by date or by time interval or by cronjob expression Task Memory job stores: Memory (default) or database for storing tasks Note: Task memory cannot be shared between schedulers Actuator executors: Used to perform tasks, and the execution mode can be set Submit the specified job to the thread pool or process pool to run, and notify the scheduler to trigger the corresponding event when the task is completed. Scheduler schedulers: Take the above three components as parameters and create a scheduler instance to execute.

Coordinate the operation of 3 components.

2. Scheduler component (schedulers)

BlockingScheduler blocking scheduler

The scheduler is the only running process in a process. Calling the start function blocks the current thread and cannot return immediately.


from apscheduler.schedulers.blocking import BlockingScheduler
import time
scheduler = BlockingScheduler()
def job1():
 print "%s:  Perform a task " % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
BackgroundScheduler Background Scheduler

The current thread does not block, and the scheduler executes in the background


from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
 print "%s:  Perform a task " % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
time.sleep(10)

Note: After 10 seconds of execution, the program ends.

AsyncIOScheduler AsyncIO Scheduler

Applicable to the case where asyncio is used


from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
...
...
try:
 asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
 pass
GeventScheduler Gevent Scheduler

The case where Gevent is used


from apscheduler.schedulers.gevent import GeventScheduler
...
...
g = scheduler.start()
try:
 g.join()
except (KeyboardInterrupt, SystemExit):
 pass

TornadoScheduler Tornado Scheduler

Suitable for building Tornado applications

TwistedScheduler Twisted Scheduler

Suitable for building Twisted applications

QtScheduler Qt Scheduler

Suitable for building Qt applications

3. Trigger Component (trigger)

date: Execute only once at a certain point in time, specific date

run_date(datetime|str)


scheduler.add_job(my_job, 'date', run_date=datetime(2019, 7, 12, 15, 30, 5), args=[])
scheduler.add_job(my_job, 'date', run_date="2019-07-12", args=[])

timezone Specifies Time Zone

interval: Allowed once every 1 time interval, time interval


weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, 'interval', hours=2)
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)

cron: Task cycle


(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

Except for week and day_of_week, their default value is *

For example, day=1, minute=20, which is equal to  year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0, the work will be performed at 20 minutes per hour on the first day of each month

Expression type

表达式 参数类型 描述
* 所有 通配符。例: minutes=* 即每分钟触发
*/a 所有 可被a整除的通配符。
a-b 所有 范围a-b触发
a-b/c 所有 范围a-b,且可被c整除时触发
xth y 第几个星期几触发。x为第几个,y为星期几
last x 1个月中,最后个星期几触发
last 1个月最后1天触发
x,y,z 所有 组合表达式,可以组合确定值或上方的表达式

Note: When the set time interval is less than the execution time of the task, the thread will be blocked, and the next task can be executed only after the execution is finished. You can set max_instance to specify how many instances are running at the same time of 1 task, and the default is 1

4. Configure the scheduler

The thread pool executor defaults to 10, and the memory task memory is memoryjobstore. If you want to configure yourself, you can perform the following operations

Requirements:

Two task stores are respectively matched with two actuators; At the same time, the default parameters of the task should be modified; Finally, we have to change the time zone MongoDBJobStore with the name "mongo" SQLAlchemyJobStore with the name "default" ThreadPoolExecutor with the name "ThreadPoolExecutor", with a maximum of 20 threads ProcessPoolExecutor with name "processpool", up to 5 processes UTC Time as Scheduler Time Zone Turn off merge mode for new tasks by default () Set the default maximum number of instances of new tasks to 3

Method 1:


from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
 'mongo': MongoDBJobStore(),
 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
 'default': ThreadPoolExecutor(20),
 'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
 'coalesce': False,
 'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

Method 2:


from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
 'apscheduler.jobstores.mongo': {
   'type': 'mongodb'
 },
 'apscheduler.jobstores.default': {
  'type': 'sqlalchemy',
  'url': 'sqlite:///jobs.sqlite'
 },
 'apscheduler.executors.default': {
  'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
  'max_workers': '20'
 },
 'apscheduler.executors.processpool': {
  'type': 'processpool',
  'max_workers': '5'
 },
 'apscheduler.job_defaults.coalesce': 'false',
 'apscheduler.job_defaults.max_instances': '3',
 'apscheduler.timezone': 'UTC',
})

Method 3:


from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
 'mongo': {'type': 'mongodb'},
 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
 'default': {'type': 'threadpool', 'max_workers': 20},
 'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
 'coalesce': False,
 'max_instances': 3
}
scheduler = BackgroundScheduler()
# .. You can add tasks here 
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

Step 5 Start the scheduler

Except for BlockingScheduler, other non-blocking schedulers will immediately return to run the subsequent code.

BlockingScheduler requires running code before start ()

1. Add a task

1. Calling add_job () # can refer to max_instance, which is the same as the number of running instances of 1 task

When a task is interrupted in the middle and resumed later, there are N tasks that are not executed
coalesce: true, the recovered task will be executed once
coalesce: false. The restored task will be used with misfire_grace_time
misfire_grace_time Sets the time difference that is not running for some reason, and when committed again, the instance will not run if it is longer than the set time.

2. Decorator scheduled_job ()

Run now without setting trigger parameters

2. Remove the task


from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
 print "%s:  Perform a task " % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
time.sleep(10)
0

3. Suspend the mission


from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
 print "%s:  Perform a task " % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
time.sleep(10)
1

4. Scheduler action


from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
 print "%s:  Perform a task " % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
time.sleep(10)
2

Exception trapping


#  You can add apscheduler Log to DEBUG Level, so that exception information can be caught 
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

Summarize


Related articles: