RocketMQ Delay Level Configuration Mode

  • 2021-10-16 01:45:53
  • OfStack

RocketMQ supports timing messages, but does not support arbitrary time precision. It only supports specific level, such as timing 5s, 10s, 1m, etc.

Among them, level=0 level means no delay, level=1 level delay, level=2 level delay, and so on.

How to configure:

Add the following line to the properties configuration file on the server side (rocketmq-broker side):

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

The corresponding mapping relationship between each level and delay time is described.

This configuration item configures the delay time of each level starting from level 1, such as 1 for delay 1s, 2 for delay 5s, and 14 for delay 10m, and the delay time of this specified level can be modified;

Time unit support: s, m, h, d, respectively representing seconds, minutes, hours, days;

The default value is stated above and can be adjusted manually;

The default value is sufficient, so it is not recommended to adjust [for reference only, or according to actual needs. When adjusting the default value, pay attention to modifying the value of level level corresponding to the time at the same time]

How to send a delayed message:

To send a delayed message, you only need to set the delay level delayLevel in the message (com. alibaba. rocketmq. common. ES50Message) to be sent by the client (rocketmq-client).


Message msg = new Message(topicName,"",keys,message.getBytes());
msg.setDelayTimeLevel(delayLevel);
SendResult sendResult = getMQProducer.send(msg);

RocketMQ Timing (Delay) Message

RocketMQ does not support delayed messages customized at any time, only delayed messages with built-in default delay intervals are supported.

The default delay interval is:

1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

Use scenarios of delayed messages

For example, in e-commerce, a delay message can be sent after submitting an order. After 1h, check the status of the order, and cancel the order to release the inventory if it is still unpaid.

Production


package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerDelay {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.10.11:9876");
        producer.start();
        Message msg1 = new Message(
                JmsConfig.TOPIC,
                " Order 001".getBytes());
        msg1.setDelayTimeLevel(2);// Delay 5 Seconds 
        Message msg2 = new Message(
                JmsConfig.TOPIC,
                " Order 001".getBytes());
        msg2.setDelayTimeLevel(4);// Delay 30 Seconds 
        SendResult sendResult1 = producer.send(msg1);
        SendResult sendResult2 = producer.send(msg2);
        System.out.println("Product1- Synchronous transmission -Product Information ={}" + sendResult1);
        System.out.println("Product2- Synchronous transmission -Product Information ={}" + sendResult2);
        producer.shutdown();
    }
}

Consumption


package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerDelay {
    public static void main(String[] args) throws Exception {
        //  Instantiate consumers 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        //  Settings NameServer The address of 
        consumer.setNamesrvAddr("192.168.10.11:9876");
        //  Subscribe 1 One or more Topic , and Tag To filter messages that need to be consumed 
        consumer.subscribe(JmsConfig.TOPIC, "*");
        //  Register a message listener 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //  Activate the consumer 
        consumer.start();
    }
}

Related articles: