Learn about spring transactions and message queues

  • 2020-05-10 18:14:32
  • OfStack

During the development process, an bug was encountered, and bug was generated because the spring transaction submitted the production message later than the message queue, resulting in incorrect data acquired when the message queue consumed the message. This article describes how the problem was created and how it was solved step by step.

1. Problems:

Scenario restore: a method in the interface that first modifies the order state and then produces the message to the message queue. The consumer of the message queue gets the message to detect the order state and finds that the order state has not changed.

Code:


@Service(orderApi)
public class OrderApiImpl implements OrderApi {
  @Resource MqService mqService;
  @OrderDao orderDao;
   
  public void push(String orderId) {
    //  Update the order status, the previous status is 1
    updateStatus(orderId, 3);
    //  Produce the message 
    mqService.produce(orderId);
  }
  public viod updateStatus(String orderId, Integer status) {
    orderDao.updateStatus(orderId, status);
  }
}

The reason for the problem: all the methods in orderApi have transactions, and the transaction type PROPAGATION_REQUIRED, so the operation of the push method on the data will be committed after all the push code is executed, and the message on the message queue has been generated before the transaction is committed, so the status of the order consumed in the message queue may be 1. To make bug more obvious, add the following at the end of the push method:


try {
  Thread.sleep(10000);
} catch (InterruptedException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
}

This will reveal that the order status 1 must have been unmodified when the consumption message was sent.  

2. Problem solving:

Solution: when updating the data, create a new thing to ensure that the transaction to update the database has been committed after the execution of the update code. (ensure that the database operation is committed before the message is generated)

According to the above scenario, the first thing that comes to my mind is to directly modify the transaction type of the updateStatus method; I changed the transaction type of this method to PROPAGATION_REQUIRES_NEW (new transaction, suspend the current transaction if it exists).

But there are two things wrong with this:

1. Forced modification of the transaction type of updateStaus, which may affect other processes.

2. Does not work, there is no new transaction in the updateStaus method.

Explanation on point 2: spring adds transactions as a dynamic proxy through BeanNameAutoProxyCreator, only adding transactions to bean objects, and now calling methods within the class does not trigger the creation of new things.

So after the above attempts, I created a new class:


@Service("orderExtApi")
public class OrderExtApiImpl {
  @Resource OrderApi orderApi;
   
  public void updateStatusNewPropagation(String orderId) {
    orderApi.updateStatus(orderId);
  }
}

And add the transaction PROPAGATION_REQUIRES_NEW for the updateStatusNewPropagation method

This class is just for the purpose of creating a new transaction for the updateStaus method in orderApi.

ok, that's it, bug has solved it.

But there is still a problem in the code: the operation to the database has been committed, and it is still wrong for the business logic to have an exception to the production message. So you need to check if the message generation is complete.

The final code in orderApi is as follows:


@Service(orderApi)
public class OrderApiImpl implements OrderApi {
  @Resource MqService mqService;
  @Resource OrderDao orderDao;
  @Resource OrderExtApiImpl orderExtApi;
   
  public void push(String orderId) {
    //  Update the order status, the previous status is 1
    orderExtApi.updateStatusNewPropagation(orderId, 3);
    //  Produce the message --produce It will detect if there is an exception   When to return to 1 When means the production message was successful 
    Response response = mqService.produce(orderId);
    if (response.getCode() != 1) {
      log.info(" Message queue production message exception: " + response.getErrorMsg())
      //  The production message is abnormal and the state is reset   Wait for the next execution 
      orderExtApi.updateStatusNewPropagation(orderId, 1);
    }
     
  }
  public viod updateStatus(String orderId, Integer status) {
    orderDao.updateStatus(orderId, status);
  }
}


Related articles: