Spring TransactionalEventListener Transaction Uncommitted Unread Data Resolution

  • 2021-11-13 07:50:29
  • OfStack

Catalog 1. Background 2. Problem Analysis 2.1, mysql Isolation Level 2.2, Problem Cause Analysis 3. Problem Solution 3.1, Approach 13.2, Approach 24. Use Cases

1. Background

Business process, found the following problems, code 1 is the original code can be executed normally, code 2 is after iteration 1 abnormal execution code

Code 1: After the following code opens the thread, the code executes normally

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
 
@Transactional
public Long test() {
  // ......
  //  Insert record 
  Long studentId = studentService.insert(student);
  //  Asynchronous thread 
  writeStatisticsData(studentId);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
Code 2: The following code does not execute properly after opening the thread

@Transactional
public Long test() {
  // ......
  //  Insert record 
  Long studentId = studentService.insert(student);
   //  Asynchronous thread 
  writeStatisticsData(studentId);
  //  Insert Student Address Record 
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}

2. Problem analysis

The spring transaction is used here, and obviously the isolation level of the transaction needs to be considered

2.1. mysql Isolation Level

View the mysql isolation level


SELECT @@tx_isolation;
READ-COMMITTED

Read commit, that is, during data insertion in transaction A, Transaction B can't read the data inserted by A before A commits, while B can read the data inserted by A after A commits, that is, Read Committed can't guarantee that the same data can be read every time in a transaction, because other concurrent transactions may modify the data just read after each read.

2.2. Analysis of the cause of the problem

Code 1 Why it works properly

Because the isolation level of mysql transactions is read commit, After the test method starts an asynchronous thread, The asynchronous thread also starts the transaction and reads the student record inserted in the test method as a reader. However, the test method has committed the transaction at this time, so the student record can be read (that is, the student record can be read in the asynchronous method). However, this code has risks. If the transaction is committed at 1 o'clock later, the asynchronous thread may not read the student record.

Why Code 2 doesn't work properly

After the above analysis, it is obvious that the student record cannot be read in the asynchronous method. Because Code 2 performs other operations under the asynchronous thread, which delays the commit of transactions in the test method, Code 2 cannot run normally.

3. Problem Solutions

The solution is to do other processing (such as asynchronous message processing, etc.) after the transaction is submitted. Here, we still start from the process of Spring executing transactions. The processing process of Spring transactions is no longer analyzed. Here, we directly look at the core processing flow of Spring transaction enhancer TransactionInterceptor. The source code is as follows:


protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  //  Get transaction properties 
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  // Object configured in the configuration TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  
  //  Handling of Declarative Transactions 
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    //  Processing of programmatic transactions ......
  }
  //......
}

Here we mainly look at the processing of declarative transactions, because the processing and submission of programmatic transactions are controlled by users in coding. In declarative transaction processing, when the method is finished, the commitTransactionAfterReturning method is executed to commit the transaction. The method is in the TransactionAspectSupport class, and the source code is as follows:


protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

Look at the commit method again, which is in the AbstractPlatformTransactionManager class, and the source code is as follows:


public final void commit(TransactionStatus status) throws TransactionException {
    //  A lot of code is omitted here, such as transaction rollback ......
		processCommit(defStatus);
}
 
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
        prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				}
				if (status.hasSavepoint()) {
					status.releaseHeldSavepoint();
				} else if (status.isNewTransaction()) {
          //  Commit transaction 
					doCommit(status);
				}
				//......
			} catch (......) {
				//  Transaction exception handling ......
			}
 
			
			try {
        //  Handling after successful transaction commit ----- Here's the point 
				triggerAfterCommit(status);
			} finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}
		}
		finally {
			cleanupAfterCompletion(status);
    }
}
 
private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();
  }
}

You end up in the TransactionSynchronizationUtils. triggerAfterCommit () method


public static void triggerAfterCommit() {
  invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
 
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
  if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
    synchronization.afterCommit();
      }
  }
}

Above, the TransactionSynchronization cached in TransactionSynchronizationManager will execute the afterCommit method in sequence, where TransactionSynchronization is cached as a collection in ThreadLocal of TransactionSynchronizationManager.

3.1, Mode 1

After the above analysis, it is only necessary to regenerate an TransactionSynchronization in the code and add it to the TransactionSynchronization collection of TransactionSynchronizationManager, so there is a solution, as follows:


private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            //  A transaction currently exists 
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
              }});
        } else {
            //  No transaction currently exists 
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
        }
}

3.2. Mode 2

Using @ TransactionalEventListener combined with Spring event listening mechanism, this annotation has been available since Spring version 4.2, as follows:


//  Events 
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
    }
}
 
//  Listener 
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
    });
  }
}
 
@Service
public class StudentService {
  // Spring4.2 After that, ApplicationEventPublisher Automatically injected into the container, using Autowired Can be obtained 
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  
  @Transactional
  public Long test() {
    // ......
    //  Insert record 
    Long studentId = studentService.insert(student);
    //  Publish Events 
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    //  Insert Student Address Record 
    Long addressId = addressService.insert(address);
    return studentId;
  }
}

Principle analysis

Spring Bean When loading the configuration file, AnnotationDrivenBeanDefinitionParser is used to parse the annotation-driven tags, as follows:


public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}
}

@Transactional
public Long test() {
  // ......
  //  Insert record 
  Long studentId = studentService.insert(student);
   //  Asynchronous thread 
  writeStatisticsData(studentId);
  //  Insert Student Address Record 
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
0

@Transactional
public Long test() {
  // ......
  //  Insert record 
  Long studentId = studentService.insert(student);
   //  Asynchronous thread 
  writeStatisticsData(studentId);
  //  Insert Student Address Record 
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
1

@Transactional
public Long test() {
  // ......
  //  Insert record 
  Long studentId = studentService.insert(student);
   //  Asynchronous thread 
  writeStatisticsData(studentId);
  //  Insert Student Address Record 
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
2

The above @ TransactionalEventListener is essentially a @ EventListener, and the TransactionalEventListenerFactory class wraps every scanned method with TransactionalEventListener annotation into an ApplicationListenerMethodTransactionalAdapter object. Through the onApplicationEvent method of ApplicationListenerMethodTransactionalAdapter, it can be seen that if there is a transaction at present, TransactionSynchronization will be generated and added to the cached ThreadLocal collection of TransactionSynchronizationManager, and the remaining process is the same as the above analysis. (Using @ TransactionalEventListener combined with Spring event listening mechanism, and using asynchronous mode feels a little awkward, here is to illustrate the problem).

4. Use cases


ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
        new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
 
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
 
    //  Database code ...
 
    //  Asynchronous code 
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            log.warn("afterCommit...");
            executorService.execute(() -> {
                //  Asynchronous service 
                execApiCache(api);
            });
    }});
 
    return ResultUtil.buildSucc();
}

Ps: setDaemon (false) Note that the daemon thread tag here must be set to false, otherwise the main thread is finished and the asynchronous thread is not finished, the asynchronous thread will be interrupted and closed immediately, so it cannot be set as a daemon (user) thread here.


Related articles: