Listening for Transactions with @ TransactionalEventListener Tutorial
- 2021-11-14 05:45:08
- OfStack
@ TransactionalEventListener Listening Transactions
Project background
I recently encountered a problem in the project
There are INSERT, UPDATE or DELETE operations in A method, and finally one MQ will be sent to the outside. After receiving MQ, the outside will send another request. After receiving the request, the system will execute B method, and B method will rely on the modified result of A method, which has a problem. If A method transaction is not submitted; And the request of B method will query the state before the transaction was committed, which will be problematic
Solution: @ TransactionalEventListener
In Spring 4.2 +, there is a method called TransactionEventListener that controls how Event events are handled during transactions. As we know, the publish-subscribe model of Spring is not really asynchronous, but synchronous to decouple the code. However, TransactionEventListener is still solved in this way, only by adding callback, so that Event can be processed when the transaction is Commited, Rollback, etc.
Concrete realization
// Create 1 Event classes
package com.qk.cas.config;
import org.springframework.context.ApplicationEvent;
public class MyTransactionEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private IProcesser processer;
public MyTransactionEvent(IProcesser processer) {
super(processer);
this.processer = processer;
}
public IProcesser getProcesser() {
return this.processer;
}
@FunctionalInterface
public interface IProcesser {
void handle();
}
}
// Create 1 Listening classes
package com.qk.cas.config;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class MyTransactionListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void hanldeOrderCreatedEvent(MyTransactionEvent event) {
event.getProcesser().handle();
}
}
//MQ Changes in methods
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendCreditResult(String applyNo, String jsonString) {
eventPublisher.publishEvent(new MyTransactionEvent(() -> {
LOGGER.info("MQ . APPLY_NO:[{}] . KEY:[{}] . Notification message: [{}]", applyNo, Queues.CREDIT_RESULT, jsonString);
rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString);
}));
}
Expand
@ TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT) Event-listening methods are executed only after the current transaction is committed, where the parameter phase defaults to AFTER_COMMIT, with four enumerations:
public enum TransactionPhase {
/**
* Fire the event before transaction commit.
* @see TransactionSynchronization#beforeCommit(boolean)
*/
BEFORE_COMMIT,
/**
* Fire the event after the commit has completed successfully.
* <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
* therefore executes in the same after-completion sequence of events,
* (and not in {@link TransactionSynchronization#afterCommit()}).
* @see TransactionSynchronization#afterCompletion(int)
* @see TransactionSynchronization#STATUS_COMMITTED
*/
AFTER_COMMIT,
/**
* Fire the event if the transaction has rolled back.
* <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
* therefore executes in the same after-completion sequence of events.
* @see TransactionSynchronization#afterCompletion(int)
* @see TransactionSynchronization#STATUS_ROLLED_BACK
*/
AFTER_ROLLBACK,
/**
* Fire the event after the transaction has completed.
* <p>For more fine-grained events, use {@link #AFTER_COMMIT} or
* {@link #AFTER_ROLLBACK} to intercept transaction commit
* or rollback, respectively.
* @see TransactionSynchronization#afterCompletion(int)
*/
AFTER_COMPLETION
}
Note @ TransactionalEventListener
For example, after the user registers, it is necessary to calculate the invitation relationship of the user and recursively operate. If the registration includes multi-step verification and generates basic initialization data, at this time, we send a message through mq to process this invitation relationship, and there will be a problem, that is, the invitation relationship will start to be executed before the user registers the data and puts it in the warehouse, but the data cannot be found, resulting in an error.
@ TransactionalEventListener enables transaction listening, which can be performed after commit.
Object to listen to
package com.jinglitong.springshop.interceptor;
import com.jinglitong.springshop.entity.Customer;
import org.springframework.context.ApplicationEvent;
public class RegCustomerEvent extends ApplicationEvent{
public RegCustomerEvent(Customer customer){
super(customer);
}
}
Actions after listening
package com.jinglitong.springshop.interceptor;
import com.alibaba.fastjson.JSON;
import com.jinglitong.springshop.entity.Customer;
import com.jinglitong.springshop.entity.MqMessageRecord;
import com.jinglitong.springshop.servcie.MqMessageRecordService;
import com.jinglitong.springshop.util.AliMQServiceUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class RegCustomerListener {
@Value("${aliyun.mq.order.topic}")
private String topic;
@Value("${aliyun.mq.regist.product}")
private String registGroup;
@Value("${aliyun.mq.regist.tag}")
private String registTag;
@Autowired
MqMessageRecordService mqMessageRecordService;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {
Customer cust = (Customer) regCustomerEvent.getSource();
Map<String, String> map = new HashMap<String, String>();
map.put("custId", cust.getZid());
map.put("account", cust.getAccount());
log.info("put regist notice to Mq start");
String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup);
MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup);
if(StringUtils.isEmpty(hdResult)) {
insert.setStatus(false);
}else {
insert.setStatus(true);
}
mqMessageRecordService.insertRecord(insert);
log.info("put regist notice to Mq end");
log.info("regist notice userId : " + cust.getAccount());
}
private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {
MqMessageRecord msg = new MqMessageRecord();
msg.setFlowId(custId);
msg.setGroupName(groupId);
msg.setTopic(topic);
msg.setTag(tag);
msg.setMsgId(result);
msg.setDataBody(jsonStr);
msg.setSendType(3);
msg.setGroupType(1);
msg.setCreateTime(new Date());
return msg;
}
}
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));
This ensures that asynchronous calculation is performed after the data is put into storage