Spring TransactionalEventListener Transaction Uncommitted Unread Data Resolution
- 2021-11-13 07:50:29
- OfStack
1. Background
Business process, found the following problems, code 1 is the original code can be executed normally, code 2 is after iteration 1 abnormal execution code
Code 1: After the following code opens the thread, the code executes normally
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
@Transactional
public Long test() {
// ......
// Insert record
Long studentId = studentService.insert(student);
// Asynchronous thread
writeStatisticsData(studentId);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
Code 2: The following code does not execute properly after opening the thread
@Transactional
public Long test() {
// ......
// Insert record
Long studentId = studentService.insert(student);
// Asynchronous thread
writeStatisticsData(studentId);
// Insert Student Address Record
Long addressId = addressService.insert(address);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
2. Problem analysis
The spring transaction is used here, and obviously the isolation level of the transaction needs to be considered
2.1. mysql Isolation Level
View the mysql isolation level
SELECT @@tx_isolation;
READ-COMMITTED
Read commit, that is, during data insertion in transaction A, Transaction B can't read the data inserted by A before A commits, while B can read the data inserted by A after A commits, that is, Read Committed can't guarantee that the same data can be read every time in a transaction, because other concurrent transactions may modify the data just read after each read.
2.2. Analysis of the cause of the problem
Code 1 Why it works properlyBecause the isolation level of mysql transactions is read commit, After the test method starts an asynchronous thread, The asynchronous thread also starts the transaction and reads the student record inserted in the test method as a reader. However, the test method has committed the transaction at this time, so the student record can be read (that is, the student record can be read in the asynchronous method). However, this code has risks. If the transaction is committed at 1 o'clock later, the asynchronous thread may not read the student record.
Why Code 2 doesn't work properlyAfter the above analysis, it is obvious that the student record cannot be read in the asynchronous method. Because Code 2 performs other operations under the asynchronous thread, which delays the commit of transactions in the test method, Code 2 cannot run normally.
3. Problem Solutions
The solution is to do other processing (such as asynchronous message processing, etc.) after the transaction is submitted. Here, we still start from the process of Spring executing transactions. The processing process of Spring transactions is no longer analyzed. Here, we directly look at the core processing flow of Spring transaction enhancer TransactionInterceptor. The source code is as follows:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// Get transaction properties
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// Object configured in the configuration TransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// Handling of Declarative Transactions
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
//......
retVal = invocation.proceedWithInvocation();
//......
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// Processing of programmatic transactions ......
}
//......
}
Here we mainly look at the processing of declarative transactions, because the processing and submission of programmatic transactions are controlled by users in coding. In declarative transaction processing, when the method is finished, the commitTransactionAfterReturning method is executed to commit the transaction. The method is in the TransactionAspectSupport class, and the source code is as follows:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
Look at the commit method again, which is in the AbstractPlatformTransactionManager class, and the source code is as follows:
public final void commit(TransactionStatus status) throws TransactionException {
// A lot of code is omitted here, such as transaction rollback ......
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// Commit transaction
doCommit(status);
}
//......
} catch (......) {
// Transaction exception handling ......
}
try {
// Handling after successful transaction commit ----- Here's the point
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
You end up in the TransactionSynchronizationUtils. triggerAfterCommit () method
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
Above, the TransactionSynchronization cached in TransactionSynchronizationManager will execute the afterCommit method in sequence, where TransactionSynchronization is cached as a collection in ThreadLocal of TransactionSynchronizationManager.
3.1, Mode 1
After the above analysis, it is only necessary to regenerate an TransactionSynchronization in the code and add it to the TransactionSynchronization collection of TransactionSynchronizationManager, so there is a solution, as follows:
private void writeStatisticsData(Long studentId) {
if(TransactionSynchronizationManager.isActualTransactionActive()) {
// A transaction currently exists
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
executor.execute(() -> {Student student = studentService.findById(studentId);
//........
});
}});
} else {
// No transaction currently exists
executor.execute(() -> {Student student = studentService.findById(studentId);
//........
});
}
}
3.2. Mode 2
Using @ TransactionalEventListener combined with Spring event listening mechanism, this annotation has been available since Spring version 4.2, as follows:
// Events
public class StudentEvent extends ApplicationEvent {
public StudentEvent(Long studentId) {
super(studentId);
}
}
// Listener
public class StudentEventListener{
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void writeStatisticsData(StudentEvent studentEvent) {
executor.execute(() -> {
Student student = studentService.findById(studentEvent.getSource());
//........
});
}
}
@Service
public class StudentService {
// Spring4.2 After that, ApplicationEventPublisher Automatically injected into the container, using Autowired Can be obtained
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Transactional
public Long test() {
// ......
// Insert record
Long studentId = studentService.insert(student);
// Publish Events
applicationEventPublisher.publishEvent(new StudentEvent(studentId));
// Insert Student Address Record
Long addressId = addressService.insert(address);
return studentId;
}
}
Principle analysis
Spring Bean When loading the configuration file, AnnotationDrivenBeanDefinitionParser is used to parse the annotation-driven tags, as follows:
public class TxNamespaceHandler extends NamespaceHandlerSupport {
//......
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
}
@Transactional
public Long test() {
// ......
// Insert record
Long studentId = studentService.insert(student);
// Asynchronous thread
writeStatisticsData(studentId);
// Insert Student Address Record
Long addressId = addressService.insert(address);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
0
@Transactional
public Long test() {
// ......
// Insert record
Long studentId = studentService.insert(student);
// Asynchronous thread
writeStatisticsData(studentId);
// Insert Student Address Record
Long addressId = addressService.insert(address);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
1
@Transactional
public Long test() {
// ......
// Insert record
Long studentId = studentService.insert(student);
// Asynchronous thread
writeStatisticsData(studentId);
// Insert Student Address Record
Long addressId = addressService.insert(address);
return studentId;
}
private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}
2
The above @ TransactionalEventListener is essentially a @ EventListener, and the TransactionalEventListenerFactory class wraps every scanned method with TransactionalEventListener annotation into an ApplicationListenerMethodTransactionalAdapter object. Through the onApplicationEvent method of ApplicationListenerMethodTransactionalAdapter, it can be seen that if there is a transaction at present, TransactionSynchronization will be generated and added to the cached ThreadLocal collection of TransactionSynchronizationManager, and the remaining process is the same as the above analysis. (Using @ TransactionalEventListener combined with Spring event listening mechanism, and using asynchronous mode feels a little awkward, here is to illustrate the problem).
4. Use cases
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
// Database code ...
// Asynchronous code
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
log.warn("afterCommit...");
executorService.execute(() -> {
// Asynchronous service
execApiCache(api);
});
}});
return ResultUtil.buildSucc();
}
Ps: setDaemon (false) Note that the daemon thread tag here must be set to false, otherwise the main thread is finished and the asynchronous thread is not finished, the asynchronous thread will be interrupted and closed immediately, so it cannot be set as a daemon (user) thread here.