Listening for Transactions with @ TransactionalEventListener Tutorial

  • 2021-11-14 05:45:08
  • OfStack

Directory @ TransactionalEventListener listening transaction recently encountered a problem in the project solution: @ TransactionalEventListener extended annotation @ TransactionalEventListener listening object listening after the operation

@ TransactionalEventListener Listening Transactions

Project background

I recently encountered a problem in the project

There are INSERT, UPDATE or DELETE operations in A method, and finally one MQ will be sent to the outside. After receiving MQ, the outside will send another request. After receiving the request, the system will execute B method, and B method will rely on the modified result of A method, which has a problem. If A method transaction is not submitted; And the request of B method will query the state before the transaction was committed, which will be problematic

Solution: @ TransactionalEventListener

In Spring 4.2 +, there is a method called TransactionEventListener that controls how Event events are handled during transactions. As we know, the publish-subscribe model of Spring is not really asynchronous, but synchronous to decouple the code. However, TransactionEventListener is still solved in this way, only by adding callback, so that Event can be processed when the transaction is Commited, Rollback, etc.

Concrete realization


// Create 1 Event classes 
package com.qk.cas.config;
import org.springframework.context.ApplicationEvent;
public class MyTransactionEvent extends ApplicationEvent {
    private static final long serialVersionUID = 1L;
    private IProcesser processer;
    public MyTransactionEvent(IProcesser processer) {
        super(processer);
        this.processer = processer;
    }
    public IProcesser getProcesser() {
        return this.processer;
    }
    @FunctionalInterface
    public interface IProcesser {
        void handle();
    }
}
// Create 1 Listening classes 
package com.qk.cas.config;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class MyTransactionListener {
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void hanldeOrderCreatedEvent(MyTransactionEvent event) {
        event.getProcesser().handle();
    }
}
//MQ Changes in methods 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void sendCreditResult(String applyNo, String jsonString) {
        eventPublisher.publishEvent(new MyTransactionEvent(() -> {
            LOGGER.info("MQ . APPLY_NO:[{}] . KEY:[{}] . Notification message: [{}]", applyNo, Queues.CREDIT_RESULT, jsonString);
            rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString);
        }));
    }

Expand

@ TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT) Event-listening methods are executed only after the current transaction is committed, where the parameter phase defaults to AFTER_COMMIT, with four enumerations:


public enum TransactionPhase {
    /**
     * Fire the event before transaction commit.
     * @see TransactionSynchronization#beforeCommit(boolean)
     */
    BEFORE_COMMIT,
    /**
     * Fire the event after the commit has completed successfully.
     * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
     * therefore executes in the same after-completion sequence of events,
     * (and not in {@link TransactionSynchronization#afterCommit()}).
     * @see TransactionSynchronization#afterCompletion(int)
     * @see TransactionSynchronization#STATUS_COMMITTED
     */
    AFTER_COMMIT,
    /**
     * Fire the event if the transaction has rolled back.
     * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and
     * therefore executes in the same after-completion sequence of events.
     * @see TransactionSynchronization#afterCompletion(int)
     * @see TransactionSynchronization#STATUS_ROLLED_BACK
     */
    AFTER_ROLLBACK,
    /**
     * Fire the event after the transaction has completed.
     * <p>For more fine-grained events, use {@link #AFTER_COMMIT} or
     * {@link #AFTER_ROLLBACK} to intercept transaction commit
     * or rollback, respectively.
     * @see TransactionSynchronization#afterCompletion(int)
     */
    AFTER_COMPLETION
}

Note @ TransactionalEventListener

For example, after the user registers, it is necessary to calculate the invitation relationship of the user and recursively operate. If the registration includes multi-step verification and generates basic initialization data, at this time, we send a message through mq to process this invitation relationship, and there will be a problem, that is, the invitation relationship will start to be executed before the user registers the data and puts it in the warehouse, but the data cannot be found, resulting in an error.

@ TransactionalEventListener enables transaction listening, which can be performed after commit.

Object to listen to


package com.jinglitong.springshop.interceptor; 
import com.jinglitong.springshop.entity.Customer;
import org.springframework.context.ApplicationEvent;
  
public class RegCustomerEvent extends ApplicationEvent{
    public RegCustomerEvent(Customer customer){
        super(customer);
    }
}

Actions after listening


package com.jinglitong.springshop.interceptor; 
import com.alibaba.fastjson.JSON;
import com.jinglitong.springshop.entity.Customer;
import com.jinglitong.springshop.entity.MqMessageRecord;
import com.jinglitong.springshop.servcie.MqMessageRecordService;
import com.jinglitong.springshop.util.AliMQServiceUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;  
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
@Component
@Slf4j
public class RegCustomerListener {
 
    @Value("${aliyun.mq.order.topic}")
    private String topic;
 
    @Value("${aliyun.mq.regist.product}")
    private String registGroup;
 
    @Value("${aliyun.mq.regist.tag}")
    private String registTag;
 
    @Autowired
    MqMessageRecordService mqMessageRecordService;
 
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {
        Customer cust = (Customer) regCustomerEvent.getSource();
        Map<String, String> map = new HashMap<String, String>();
        map.put("custId", cust.getZid());
        map.put("account", cust.getAccount());
        log.info("put regist notice to Mq start");
        String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup);
        MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup);
        if(StringUtils.isEmpty(hdResult)) {
            insert.setStatus(false);
        }else {
            insert.setStatus(true);
        }
        mqMessageRecordService.insertRecord(insert);
        log.info("put regist notice to Mq end");
        log.info("regist notice userId : " + cust.getAccount());
    }
 
    private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {
        MqMessageRecord msg = new MqMessageRecord();
        msg.setFlowId(custId);
        msg.setGroupName(groupId);
        msg.setTopic(topic);
        msg.setTag(tag);
        msg.setMsgId(result);
        msg.setDataBody(jsonStr);
        msg.setSendType(3);
        msg.setGroupType(1);
        msg.setCreateTime(new Date());
        return msg;
    } 
}
@Autowired
    private ApplicationEventPublisher applicationEventPublisher;
 
applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));

This ensures that asynchronous calculation is performed after the data is put into storage


Related articles: