SpringBoot concurrent timing task implementation dynamic timing task implementation (see this article is enough) recommended

  • 2021-07-13 05:25:14
  • OfStack

1. In the field of JAVA development, timing tasks can be performed in the following ways at present

1. Stand-alone deployment mode

Timer: jdk comes with a timing scheduling class, which can simply implement task execution at a certain frequency. The functions provided are relatively single 1, and complex scheduling tasks cannot be realized.
ScheduledExecutorService: It is also a timed task class based on thread pool design that comes with jdk. Each scheduling task is assigned to one thread in the thread pool, so its tasks are executed concurrently and do not affect each other.
Spring Task: A task scheduling tool provided by Spring, which supports annotations and configuration files, and supports Cron expressions. It is simple to use but powerful.
Quartz: A powerful task scheduler, which can realize more complex scheduling functions, such as execution on the 1st of each month, execution in the early morning of every day, execution on the 5th of every week, etc. It also supports distributed scheduling, that is, the configuration is slightly complicated.

2. Distributed cluster mode (not much introduction, simply mention 1)

Question:

I, how to solve multiple executions of timed tasks? II, how to solve the single point problem of the task and realize the failover of the task?

Simple thinking on problem I;

1. A machine that executes scheduled tasks in a fixed way (it can effectively avoid multiple executions, but the disadvantage is a single point of failure). 2. With the expiration mechanism and distributed lock of Redis. 3. With the help of the locking mechanism of mysql, etc.

Mature solutions:

1. Quartz: You can read this article [Quartz Distributed] (https://www.ofstack.com/article/102869. htm).
2. elastic-job: (https://github.com/elasticjob/elastic-job-lite) The flexible distributed task scheduling system developed by Dangdang adopts zookeeper to realize distributed coordination, high availability and fragmentation of tasks.
3. xxl-job: (https://github.com/xuxueli/xxl-job) is a distributed task scheduling platform published by public commentators and a lightweight distributed task scheduling framework.
4. saturn: (https://github.com/vipshop/Saturn) is a distributed, fault-tolerant and highly available job scheduling service framework provided by Vipshop.

2. SpringTask implements timing tasks (here based on springboot)

1. Simple timed task implementation

Usage:

Use the @ EnableScheduling annotation to turn on support for timed tasks. Use @ Scheduled annotation, and realize timing tasks based on corn, fixedRate, fixedDelay and other timing strategies.

Disadvantages in use:

1. Multiple timed tasks use the same scheduling thread, so the task is blocked and the execution efficiency is not high. 2. Secondly, if the task is blocked, the timing calculation of one scene has no practical significance. For example, one calculation task at 12 o'clock every day is blocked to 1 o'clock, which will lead to the result not what we want.

Advantages of use:

1. Simple configuration 2. It is suitable for the scenario where a single background thread executes periodic tasks and ensures sequence 1 to execute

Source code analysis:


// Scheduler used by default 
if(this.taskScheduler == null) { 
 this.localExecutor = Executors.newSingleThreadScheduledExecutor();
 this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// You can see SingleThreadScheduledExecutor The specified core thread is 1 To put it bluntly, it is single-threaded execution 
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
 return new DelegatedScheduledExecutorService
 (new ScheduledThreadPoolExecutor(1));
}
// Take advantage of DelayedWorkQueue The delay queue is used as the storage queue of tasks, so that the delayed execution or timed execution of tasks can be realized 
public ScheduledThreadPoolExecutor(int corePoolSize) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  new DelayedWorkQueue());
}

2. Implement concurrent timing tasks

Usage:

Mode 1: We know from 1 that the timing task is blocking execution, which is determined by the configured thread pool, so it is easy to do, just change one! Directly on the code:


@Configuration
 public class ScheduledConfig implements SchedulingConfigurer {

 @Autowired
 private TaskScheduler myThreadPoolTaskScheduler;

 @Override
 public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
  // Direct designation in a simple and rude way 
  //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
  // You can also customize the thread pool, which is convenient for the use and maintenance of threads. I won't say much here 
  scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
 }
 }

 @Bean(name = "myThreadPoolTaskScheduler")
 public TaskScheduler getMyThreadPoolTaskScheduler() {
 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
 taskScheduler.setPoolSize(10);
 taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
 taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 // Scheduler shutdown When called, wait for the currently scheduled task to complete 
 taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
 // Waiting time 
 taskScheduler.setAwaitTerminationSeconds(60);
 return taskScheduler;
 } 

Mode 2: The essence of Mode 1 changes the default thread pool used by the task scheduler. Next, it does not change the default thread pool of the scheduler, but hands over the current task to an asynchronous thread pool for execution

Start by enabling asynchronous tasks with @ EnableAsync
Then add @ Async to the method of timing tasks, and the thread pool used by default is SimpleAsyncTaskExecutor (this thread pool creates one thread for one task by default, which will continuously create a large number of threads, which is very likely to explode the server memory. Of course, it has its own current limiting mechanism, so I won't say much here. If you are interested, you can flip through the source code ~)
In order to better control the use of threads in the project, we can customize our own thread pool, using @ Async ("myThreadPool")

Too much nonsense, go directly to the code:


 @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
 @Async("myThreadPoolTaskExecutor")
 //@Async
 public void scheduledTest02(){
  System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
 }

 // Custom thread pool 
 @Bean(name = "myThreadPoolTaskExecutor")
 public TaskExecutor getMyThreadPoolTaskExecutor() {
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  taskExecutor.setCorePoolSize(20);
  taskExecutor.setMaxPoolSize(200);
  taskExecutor.setQueueCapacity(25);
  taskExecutor.setKeepAliveSeconds(200);
  taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
  //  Thread pool processing policy for denial tasks (available to wireless threads), currently only supports AbortPolicy , CallerRunsPolicy ; Default to the latter 
  taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  // Scheduler shutdown When called, wait for the currently scheduled task to complete 
  taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  // Waiting time 
  taskExecutor.setAwaitTerminationSeconds(60);
  taskExecutor.initialize();
  return taskExecutor;
 }

Experience in using thread pool (there will be a special article to discuss it later)

ThreadPoolExecutor and ScheduledThreadPoolExecutor are provided in java, which correspond to ThreadPoolTaskExecutor and ThreadPoolTaskScheduler in spring, but new features are added on the basis of the original, which makes it easier to use and control in spring environment.
Using custom thread pool can avoid memory overflow, blocking and other problems caused by default thread pool, which is more suitable for its own service characteristics
Use a custom thread pool to facilitate the management, maintenance and monitoring of threads in the project.
Even in a non-spring environment, don't use the thread pools provided by java by default. There are many pits. Don't say Ali code protocol? You have to trust Dachang! ! !

3. Implementation of Dynamic Timing Task

Question:
Use the @ Scheduled annotation to complete the set timing task, But sometimes we often need to make some changes to the setting of periodic time. Or to dynamically start and stop a timing task, it is inconvenient to use this annotation at this time, because the cron expression configured in this annotation must be a constant, so when we modify the timing parameters, we need to stop the service and redeploy.

Solution:

Mode 1: Realize SchedulingConfigurer interface, rewrite configureTasks method, and re-formulate Trigger. The core method is addTriggerTask (Runnable task, Trigger trigger). However, it should be noted that after modifying the configuration value in this way, the scheduler will be updated only after the next scheduling, and will not be updated in real time when modifying the configuration value. Real-time update needs to be added when modifying the configuration value.


@Configuration
 public class ScheduledConfig implements SchedulingConfigurer {

 @Autowired
 private TaskScheduler myThreadPoolTaskScheduler;

 @Override
 public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
  //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
  scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
  // It can dynamically adjust the execution frequency of timed tasks 
  scheduledTaskRegistrar.addTriggerTask(
    //1. Add task content (Runnable)
    () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
    //2. Set the execution cycle (Trigger)
    triggerContext -> {
     //2.1  Dynamic retrieval of execution cycles from a database 
     String cron = "0/2 * * * * ? ";
     //2.2  Validity check .
 //     if (StringUtils.isEmpty(cron)) {
 //      // Omitted Code ..
 //     }
      //2.3  Return to the execution cycle (Date)
      return new CronTrigger(cron).nextExecutionTime(triggerContext);
     }
   );
 }
 }

Mode 2: The threadPoolTaskScheduler class can be used to realize dynamic addition and deletion function, and of course, the adjustment of execution frequency can also be realized

First of all, we need to know this scheduling class, which is actually the product of an improved package of ScheduledThreadPoolExecutor in java. The main improvements are as follows:

1. Provide the default configuration. Because it is ScheduledThreadPoolExecutor, there is only one default parameter, poolSize. 2. Support custom tasks by passing in Trigger parameters. 3. Optimize the task error handling. If it is a repetitive task, no exception will be thrown, and it will be recorded by log, which will not affect the next run. If it is a task that only executes once, the exception will be thrown up.

By the way, the improvement points of ThreadPoolTaskExecutor compared with ThreadPoolExecutor are as follows:

1. Provide default configuration. Native ThreadPoolExecutor has no default configuration except ThreadFactory and RejectedExecutionHandler 2. Implement AsyncListenableTaskExecutor interface, support adding success and fail callbacks to FutureTask, and execute corresponding callback methods when the task succeeds or fails. 3. Because it is a tool class of spring, the thrown RejectedExecutionException will also be converted into TaskRejectedException exception of spring framework (this doesn't matter) 4. Provide the default ThreadFactory implementation, and directly configure it through parameter overload

After talking so much, I still go directly to the code:


 @Component
 public class DynamicTimedTask {

  private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);

  // Use the created scheduling class system 1 Management 
  //@Autowired
  //@Qualifier("myThreadPoolTaskScheduler")
  //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;


  // Accept the return results of the task 
  private ScheduledFuture<?> future;

  @Autowired
  private ThreadPoolTaskScheduler threadPoolTaskScheduler;

  // Instantiation 1 Thread pool task scheduling class , You can use the custom ThreadPoolTaskScheduler
  @Bean
  public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
   ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
   return new ThreadPoolTaskScheduler();
  }


  /**
  *  Start a timed task 
  * @return
  */
  public boolean startCron() {
   boolean flag = false;
   // Dynamic retrieval of execution cycles from a database 
   String cron = "0/2 * * * * ? ";
   future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
   if (future!=null){
    flag = true;
    logger.info(" Timing check Training model file , Mission started successfully! ! ! ");
   }else {
    logger.info(" Timing check Training model file , Task startup failed! ! ! ");
   }
   return flag;
  }

  /**
  *  Stop Timing Task 
  * @return
  */
  public boolean stopCron() {
   boolean flag = false;
   if (future != null) {
    boolean cancel = future.cancel(true);
    if (cancel){
     flag = true;
     logger.info(" Timing check Training model file , Mission stopped successfully! ! ! ");
    }else {
     logger.info(" Timing check Training model file , Mission stop failed! ! ! ");
    }
   }else {
    flag = true;
    logger.info(" Timing check Training model file, task has stopped! ! ! ");
   }
   return flag;
  }


  class CheckModelFile implements Runnable{

   @Override
   public void run() {
    // Write your own business logic  
    System.out.print(" Check the model file! ! ! ")
   }
  }

 }

4. Summary

To this based on the springtask timer task under the simple use is almost, which inevitably some mistakes, or understanding biased place welcome everyone to come up!
Based on the distributed cluster under the timed task use, follow-up time to continue! ! !


Related articles: