Implementation of asynchronous event driven method in SpringBoot

  • 2021-10-13 07:15:39
  • OfStack

Directory Spring event driven
Source code actual combat

In the actual development process of the project, we have many such business scenarios: after processing one business logic in one transaction, we need to process another business logic, and the pseudo code is roughly as follows:


@Service
public class ProductServiceImpl {
 ...
    public void saveProduct(Product product) {
        productMapper.saveOrder(product);
        notifyService.notify(product);
    }
 ...
}

A simple and common one-piece business logic: Save the product to the database first, and then send a notification.

One day you may need to store new products in Es, and at this time, the code may become like this:


@Service
public class ProductServiceImpl {
 ...
    public void saveProduct(Product product) {
        productMapper.saveProduct(product);
        esService.saveProduct(product)
        notifyService.notify(product);
    }
 ...
}

With the change of business requirements, the code needs to be modified over and over again. And there will be another problem. If the notification system hangs up, no new products can be added.

In this case, it is very suitable to introduce message middleware (message queue) to decouple business, but not all business systems will introduce message middleware (introducing third-party architecture components will bring great operation and maintenance costs).

Spring provides an event-driven mechanism to help us achieve this 1 requirement.

Spring event driven

spring event driver consists of three parts

ApplicationEvent: Represents the event itself. Custom events need to inherit this class to define events ApplicationEventPublisher: Event sender, mainly used to publish events ApplicationListener: Event listener interface, the listener class can implement onApplicationEvent method in ApplicationListener, or @ EventListener can be added to the method to realize event listening.

Implementing Spring event-driven 1 generally requires only three steps:

Customize the event class to be published, and inherit the ApplicationEvent class Use ApplicationEventPublisher to publish custom events Use @ EventListener to listen for events

One point that needs special attention here is that events are synchronized by default. That is, the event will wait for Listener after being processed by publish. If there is a transaction in the business where the event is published, the listener processing will also be in the same transaction. If you need to process events asynchronously, you can add @ Aync to the onApplicationEvent method to support asynchronism or add @ Aync to the annotated method with @ EventListener.

Source code actual combat

Create an event


public class ProductEvent extends ApplicationEvent {
    public ProductEvent(Product product) {
        super(product);
    }
}

Publish Events


@Service
public class ProductServiceImpl implements IproductService {
 ...
    @Autowired
    private ApplicationEventPublisher publisher;
 
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveProduct(Product product) {
  productMapper.saveProduct(product); 
        // Event publishing 
        publisher.publishEvent(product);
    }
    ...
}

Event monitoring


@Slf4j
@AllArgsConstructor
public class ProductListener {

 private final NotifyService notifyServcie;

 @Async
 @Order
 @EventListener(ProductEvent.class)
 public void notify(ProductEvent event) {
  Product product = (Product) event.getSource();
  notifyServcie.notify(product, "product");
 }
}

Add @ EnableAsync annotation to SpringBoot startup class


@Slf4j
@EnableSwagger2
@SpringBootApplication
@EnableAsync
public class ApplicationBootstrap {
...
}

After using Async, the default thread pool SimpleAsyncTaskExecutor will be used, so we will customize one thread pool in the project.


@Configuration
public class ExecutorConfig {
    /**  Number of core threads  */
    private int corePoolSize = 10;
    /**  Maximum number of threads   */
    private int maxPoolSize = 50;
    /**  Queue size   */
    private int queueCapacity = 10;
    /**  Maximum idle time of thread    */
    private int keepAliveSeconds = 150;

    @Bean("customExecutor")
    public Executor myExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("customExecutor-");
        executor.setKeepAliveSeconds(keepAliveSeconds);

        // rejection-policy : When pool Has reached max size How to deal with new tasks when 
        // CALLER_RUNS The task is not executed in a new thread, but is executed by the caller's thread 
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}


Related articles: