python Realizes Timing Tasks Based on Apscheduler

  • 2021-08-28 20:23:26
  • OfStack

Introduction

In the work scenario, we encountered such a scenario, that is, we need to execute a cache interface regularly for synchronizing device configuration. The first thing that comes to mind is crontab on Linux, which can perform tasks regularly or at intervals of 1 period of time. But if you want to integrate this timed task as a module into the Python project, or if you want to persist the task, it is obvious that crontab is not suitable. APScheduler module of Python can solve such problems well, so I wrote this article specially, recording the most basic usage scenarios of APScheduler from a simple introduction, and solving the problem of persistent tasks. Finally, I combined with other frameworks to deeply customize the timing task module.

Brief introduction

First, briefly introduce the four components included in the Apscheduler module under 1:

Trigger flip-flop Job job Excutor Actuator Scheduler Scheduler

Having a general understanding of several concepts contained in Apscheduler, let's look at a simple example:


# -*- coding: utf-8 -*-

from apscheduler.schedulers.blocking import BlockingScheduler
import time


def hello():
  print(time.strftime("%c"))


if __name__ == "__main__":
  scheduler = BlockingScheduler()
  scheduler.add_job(hello, 'interval', seconds=5)
  scheduler.start()

Sample output:


Thu Dec 3 16:01:20 2020
Thu Dec 3 16:01:25 2020
Thu Dec 3 16:01:30 2020
Thu Dec 3 16:01:35 2020
Thu Dec 3 16:01:40 2020
..........

For this simple example, we use several components mentioned above to analyze the running logic under 1:

The first is the Scheduler scheduler. The BlockingScheduler scheduler used in this example is explained in the official document that BlockingScheduler is suitable when your timer task program is the only one running program; In other words, the BlockingScheduler scheduler is a blocking scheduler. When the program runs this scheduler, the process will block and cannot perform other operations; Then there are Job jobs and triggers. These two are put in one because when defining jobs, you need to select one trigger. Here, you choose interval triggers, which will run jobs at fixed intervals. In other words, add one hello job to the scheduler and execute the task at 5-second intervals. Finally, there is the executor, which is ThreadPoolExcutor executor by default. They hand over the callable objects in the task to the thread pool to perform the operation. After the operation is completed, the executor will notify the scheduler.

Three built-in Trigger trigger types:

date: Jobs run only once at a specific time interval: Run the job once at a fixed interval cron: Run jobs periodically at specific times within 1 day

Common Scheduler schedulers:

BlockingScheduler: The scheduler is the only thing running in the process BackgroundScheduler: The scheduler is used when running in the background inside the application AsyncIOScheduler: Applications use asyncio modules GeventScheduler: Applications use gevent modules TornadoScheduler: Use when building an Tornado application TwistedScheduler: Use when building an Tornado application QtScheduler: Used when building an QT application

Common JobStore:

MemoryJobStore MongoDBJobStore SQLAlchemyJobStore RedisJobStore

Advanced use

Through the above simple example, we can understand the general workflow and the role of each component in the whole process. The following example is the Flask Web framework combined with Apscheduler timer to execute tasks regularly.


# -*- coding: utf-8 -*-

from flask import Flask, Blueprint, request
from apscheduler.executors.pool import ThreadPoolExecutor 
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
import time

app = Flask(__name__)
executors = {"default": ThreadPoolExecutor(5)}
default_redis_jobstore = RedisJobStore(db=2, 
    jobs_key="apschedulers.default_jobs",
    run_times_key="apschedulers.default_run_times",
    host = '127.0.0.1',
    port = 6379
    )

scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(default_redis_jobstore)
scheduler.start()

def say_hello():
  print(time.strftime("%c"))


@app.route("/get_job", methods=['GET'])
def get_job():
  if scheduler.get_job("say_hello_test"):
    return "YES"
  else:
    return "NO"

@app.route("/start_job", methods=["GET"])
def start_job():
  if not scheduler.get_job("say_hello_test"):
    scheduler.add_job(say_hello, "interval", seconds=5, id="say_hello_test")
    return "Start Scuessfully!"
  else:
    return "Started Failed"
  
@app.route("/remove_job", methods=["GET"])
def remove_job():
  if scheduler.get_job("say_hello_test"):
    scheduler.remove_job("say_hello_test")
    return "Delete Successfully!"
  else:
    return "Delete Failed"


if __name__ == "__main__":
  app.run(host="127.0.0.1", port=8787, debug=True)
First, analyze Jobstore, which uses RedisJobstore, and serialize tasks into Redis database. By the way, why do you need to set up job memory? The reason is that when the scheduler program crashes, you can still keep jobs. Of course, what job memory to choose can be based on the specific working scene. At present, the mainstream mysql, mongodb, redis and SQLite basically support it; Then look at Scheduler, which is used here when BackgroundScheduler, because it requires that the scheduler cannot block the normal receiving request of flask program, so choose BackgrounScheduler to run in the background when it starts to execute tasks, and will not block the main thread; Finally, look at the logic of the work, where get_job gets the status of the job to see if the job exists, start_job judges whether the job starts first, and then decides to start the operation, and remove_job stops the job. The job definition here is to execute say_hello task every 5 seconds through interval trigger;

Summarize

Finally, in summary 1, first of all, you should set up a job memory for resuming when the scheduler crashes, and you can also get the job in the job memory to continue execution; Then you need to set up an executor. According to the type of job, such as an CPU-intensive task, you can use a process pool executor, and the default is a thread pool executor; Finally, create a configuration scheduler to start scheduling. You can add jobs before starting, and you can also add, delete and obtain jobs after starting. (One thing to understand here is that the application does not directly manipulate the job store, job, or executor, but the scheduler provides the appropriate interfaces to handle these interfaces. )

ApScheduler is a good timed task library, which can dynamically add and delete, and also supports different trigger types, which is also its advantage. On the contrary, if it is a static task, it can actually use crontab tools such as linux to do timed tasks. The records in this area will be continuously updated. If there are any questions, you can ask them for discussion.

The above is the python Apscheduler usage details, more information about python Apscheduler please pay attention to other related articles on this site!


Related articles: