RocketMQ Delay Level Configuration Mode
- 2021-10-16 01:45:53
- OfStack
RocketMQ supports timing messages, but does not support arbitrary time precision. It only supports specific level, such as timing 5s, 10s, 1m, etc.
Among them, level=0 level means no delay, level=1 level delay, level=2 level delay, and so on.
How to configure:
Add the following line to the properties configuration file on the server side (rocketmq-broker side):
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
The corresponding mapping relationship between each level and delay time is described.
This configuration item configures the delay time of each level starting from level 1, such as 1 for delay 1s, 2 for delay 5s, and 14 for delay 10m, and the delay time of this specified level can be modified;
Time unit support: s, m, h, d, respectively representing seconds, minutes, hours, days;
The default value is stated above and can be adjusted manually;
The default value is sufficient, so it is not recommended to adjust [for reference only, or according to actual needs. When adjusting the default value, pay attention to modifying the value of level level corresponding to the time at the same time]
How to send a delayed message:
To send a delayed message, you only need to set the delay level delayLevel in the message (com. alibaba. rocketmq. common. ES50Message) to be sent by the client (rocketmq-client).
Message msg = new Message(topicName,"",keys,message.getBytes());
msg.setDelayTimeLevel(delayLevel);
SendResult sendResult = getMQProducer.send(msg);
RocketMQ Timing (Delay) Message
RocketMQ does not support delayed messages customized at any time, only delayed messages with built-in default delay intervals are supported.
The default delay interval is:
1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
Use scenarios of delayed messages
For example, in e-commerce, a delay message can be sent after submitting an order. After 1h, check the status of the order, and cancel the order to release the inventory if it is still unpaid.
Production
package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerDelay {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.10.11:9876");
producer.start();
Message msg1 = new Message(
JmsConfig.TOPIC,
" Order 001".getBytes());
msg1.setDelayTimeLevel(2);// Delay 5 Seconds
Message msg2 = new Message(
JmsConfig.TOPIC,
" Order 001".getBytes());
msg2.setDelayTimeLevel(4);// Delay 30 Seconds
SendResult sendResult1 = producer.send(msg1);
SendResult sendResult2 = producer.send(msg2);
System.out.println("Product1- Synchronous transmission -Product Information ={}" + sendResult1);
System.out.println("Product2- Synchronous transmission -Product Information ={}" + sendResult2);
producer.shutdown();
}
}
Consumption
package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerDelay {
public static void main(String[] args) throws Exception {
// Instantiate consumers
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Settings NameServer The address of
consumer.setNamesrvAddr("192.168.10.11:9876");
// Subscribe 1 One or more Topic , and Tag To filter messages that need to be consumed
consumer.subscribe(JmsConfig.TOPIC, "*");
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Activate the consumer
consumer.start();
}
}