Solve the pit encountered by SpringBoot integrating RocketMQ
- 2021-10-16 01:31:12
- OfStack
Application scenario
When implementing RocketMQ consumption, 1 generally uses @ RocketMQMessageListener annotation to define Group, Topic and selectorExpression (data filtering, selection rules). In order to support dynamic filtering of data, 1 generally uses expressions, and then dynamically switches through apollo or cloud config.
Introducing dependency
<!-- RocketMq Spring Boot Starter-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
Consumer code
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println(" The data consumed are: "+s);
}
}
Problem troubleshooting
The whole annotation of RocketMQMessageListener defaults to selectorExpression *, which means receiving all the data under the current Topic. If we want to dynamically configure tags, we will find that all the data are filtered when using the ${rocketmq. selectorExpression} expression, and the tracking source code (ListenerContainerConfiguration. java) finds that the data of selectorExpression is overwritten after obtaining the corresponding data in the environment environment variable when creating listener, resulting in the whole filtering condition being changed to the expression.
@Override
public void afterSingletonsInstantiated() {
// Gets all of them that use the RocketMQMessageListener Annotated bean
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(beans)) {
// Circular registration container
beans.forEach(this::registerContainer);
}
}
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
// Verify the current bean Is it implemented RocketMQListener Interface
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
}
// Get bean Above annotation
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
// Analyse group And topic Supports expressions
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
// Registration bean That calls the createRocketMQListenerContainer
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
// Here, the data has been fetched according to the expression
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
// Here will SelectorExpression The data of is overwritten into an expression
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
return container;
}
Problem solving
Because the ListenerContainerConfiguration class is an afterSingletonsInstantiated method that implements the SmartInitializingSingleton interface, we can parse and assign the data of selectorExpression through reflection before initializing ListenerContainerConfiguration.
/**
* In springboot After initialization, RocketMQ Dynamic change of data by reflection before initialization of container
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private StandardEnvironment environment;
@Override
public void afterPropertiesSet() throws Exception {
Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
for (Object bean : beans.values()){
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
continue;
}
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
Field field = invocationHandler.getClass().getDeclaredField("memberValues");
field.setAccessible(true);
Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
if(Objects.nonNull(entry)){
memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
}
}
}
}
}
This Bug has been fixed in the 2.1. 0 dependency package except for the first time, and it is recommended to use a 2.1. 0 or higher package without causing dependency conflicts.