SpringBoot integrates RabbitMQ and produces advanced features of the whole scene

  • 2021-11-29 23:46:11
  • OfStack

Catalog Summary Integration Dependencies and configurations
Producer Configuration Message Queuing Rules
Producer releases message
Consumers listen to messages

Summary

The integration scenario includes topic working mode (routingKey can satisfy four working modes, such as simple/work queue/publish subscription/route), confirm (message confirmation), return (message return), basicAck (message receipt), basicNack (reject receipt), DLX (Dead Letter Exchange dead letter queue) to realize delayed/timed tasks, etc.

Integration

Dependencies and configurations

The following content consumers and producers


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

server.port=8090
spring.rabbitmq.host=192.168.168.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=zheng123
spring.rabbitmq.password=zheng123
spring.rabbitmq.virtual-host=/zheng
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.direct.acknowledge-mode=manual

Producer Configuration Message Queuing Rules

Here are two configuration methods, and the first configuration is used in this integration example


@Configuration
public class TopicConfig {

   //  Declare queue 
   @Bean
   public Queue topicQ1() {
      return new Queue("topic_sb_mq_q1");
   }
   
   //  Declare a queue and bind it to a dead-letter switch ( The return value can be written in two ways, optionally 1 All kinds are fine )
   //  Testing dead letters requires turning off listening in the original queue 
   @Bean
   public Queue topicQ2() {

        return QueueBuilder.durable("topic_sb_mq_q2")
              .withArgument("x-dead-letter-exchange", "topicExchange")
              .withArgument("x-dead-letter-routing-key", "changsha.f")
              .withArgument("x-message-ttl", 10000)
              .build();

        Map<String,Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange","topicExchange");
        arguments.put("x-dead-letter-routing-key","changsha.f");
        arguments.put("x-message-ttl",10000);
        return new Queue("topic_sb_mq_q2",true,false,false,arguments);
   }


   // Declaration exchange
   @Bean
   public TopicExchange setTopicExchange() {
      return new TopicExchange("topicExchange");
   }

   // Declaration binding , need to declare 1 A routingKey
   @Bean
   public Binding bindTopicHebei1() {
      return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");
   }
   @Bean
   public Binding bindTopicHebei2() {
      return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");
   }
}

@Configuration
public class RabbitMqConfig {

    // Define the name of the switch 
    public static final String  EXCHANGE_NAME = "boot_topic_exchange";
    // Define the name of the queue 
    public static final String QUEUE_NAME = "boot_queue";

    //1 Declare the switch 
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2 Declare the queue 
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3 The queue is bound to the switch 
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
        // topic Mode is compatible with broadcast mode and routing mode. with("#") A similar broadcast pattern matches all subscribers; with("boot.1") The subscriber is specified similar to routing pattern matching 
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

Producer releases message


@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping(value="/topicSend")
    public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {

       //  Definition  confirm  Callback 
       rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
          if (ack) {
             // confirmed
          } else {
             // nack-ed
          }
       });

       rabbitTemplate.setMandatory(true);
       rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{
          // return message
       });

       if(null == routingKey) {
          routingKey="changsha.kf";
       }
       MessageProperties messageProperties = new MessageProperties();
       messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
       //fanout Mode only goes to exchange Send a message in. Distribute to exchange All under queue
       rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
       return "message sended : routingKey >"+routingKey+";message > "+message;
    }
}

Consumers listen to messages


@Component
public class ConcumerReceiver {
    //topic  Mode 
    // Note that this pattern will have a priority matching principle. Such as sending routingKey=hunan.IT, That matches the hunan.*(hunan.IT,hunan.eco), After that, there will be no match *.ITd
    @RabbitListener(queues="topic_sb_mq_q1")
    public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException {
       //  Message id
       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       try {
          // message.getBody() todosomething

          //  Signing message 
          channel.basicAck(deliveryTag, true);
       } catch (Exception e) {

          //  Refuse to sign 
          //  No. 1 3 Parameters: requeue Return to the queue. If set to true The message returns to the queue , broker The message is resent to the consumer 
          channel.basicNack(deliveryTag, true, true);
       }
    }

    @RabbitListener(queues="topic_sb_mq_q2")
    public void topicReceiveq2(String message) {
       System.out.println("Topic Mode  topic_sb_mq_q2 received  message : " +message);
    }
}

Related articles: