spring boot integrated RabbitMQ

  • 2020-06-15 09:05:24
  • OfStack

As the representative product of AMQP, RabbitMQ is widely used in the project. Combined with the current mainstream spring boot, the problem of message communication involved in the development process is greatly simplified.

First of all, RabbitMQ is properly installed and running normally.

RabbitMQ requires erlang environment, so first install the corresponding version of erlang, which can be downloaded from the RabbitMQ website


# rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

Install RabbitMQ using yum to avoid installation failures due to lack of dependent packages


# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

Start the RabbitMQ


# /sbin/service rabbitmq-server start

Since guest users, provided by default by RabbitMQ, are only locally accessible, additional users are created for testing


# /sbin/rabbitmqctl add_user test test123
 User name: test And password: test123

Enable the web administration plug-in


# rabbitmq-plugins enable rabbitmq_management

Log in with the user you created earlier and set the user to administrator with the virtual host address /

spring boot introduces dependency


<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
</dependencies>

Message producer

application.properties add 1 under configuration


spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

spring boot configuration class for specifying queues, exchange types, and binding operations


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

  // Statement of the queue 
  @Bean
  public Queue queue1() {
    return new Queue("hello.queue1", true); // true Represents persisting the queue 
  }

  @Bean
  public Queue queue2() {
    return new Queue("hello.queue2", true);
  }

  // Declarative interchanger 
  @Bean
  TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
  }

  // The binding 
  @Bean
  public Binding binding1() {
    return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
  }

  @Bean
  public Binding binding2() {
    return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
  }

}

A total of two queues, hello.queue1 and ES57en.queue2, are declared, the switch type is TopicExchange, and are bound to hello.es61EN1 and ES62en.queue2 queues, respectively.

The producer classes


import java.util.UUID;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @PostConstruct
  public void init() {
    rabbitTemplate.setConfirmCallback(this);
    rabbitTemplate.setReturnCallback(this);
  }

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (ack) { 
      System.out.println(" Message sent successfully :" + correlationData); 
    } else { 
      System.out.println(" Message sending failed :" + cause); 
    } 

  }

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println(message.getMessageProperties().getCorrelationIdString() + "  Send failure ");

  }

  // To send a message, no interface needs to be implemented for external invocation. 
  public void send(String msg){

    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

    System.out.println(" Start sending messages  : " + msg.toLowerCase());
    String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
    System.out.println(" End sending message  : " + msg.toLowerCase());
    System.out.println(" Consumer response  : " + response + "  Message processing completed ");
  }
}

Key points:

1. Inject RabbitTemplate

2. Implementation of ES74en. ConfirmCallback, ES76en. ReturnCallback interface (not required).
The ConfirmCallback interface is used to receive the ack callback after a message is sent to the RabbitMQ switch. The ReturnCallback interface is used to implement callbacks when messages are sent to the RabbitMQ switch without the corresponding queue binding to the switch.

3. Implement message sending method. Call the corresponding method of rabbitTemplate. Common sending methods of rabbitTemplate are


rabbitTemplate.send(message);  // Send a message with a parameter type of org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); // Transform and send the message.   Converts the parameter object to org.springframework.amqp.core.Message After sending 
rabbitTemplate.convertSendAndReceive(message) // Transform and send the message , And wait for the message to return a response message. 

Select the appropriate message delivery method for the business scenario.

Message consumer

application. properties add 1 under configuration


# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
0

Consumer class


# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
1

Since two queues are defined, different listeners are defined to listen to different queues. Since the minimum and maximum number of message listening threads are both 2, each listener has two threads to implement the listening function.

Key points:

1. The listener parameter type matches the actual message type. The actual type of message sent in the producer is String, so here the listener parameter type is also String.

2. If the listener needs a response to be returned to the producer, simply listen to return.

Run the test


# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
2

Output:


 Start sending messages  : wed mar 29 23:20:52 cst 2017
SimpleAsyncTaskExecutor-1  Receive from hello.queue2 Messages for queues: Wed Mar 29 23:20:52 CST 2017
SimpleAsyncTaskExecutor-2  Receive from hello.queue1 Messages for queues: Wed Mar 29 23:20:52 CST 2017
 End sending message  : wed mar 29 23:20:52 cst 2017
 Consumer response  : WED MAR 29 23:20:52 CST 2017  Message processing completed 
------------------------------------------------
 Message sent successfully :CorrelationData [id=340d14e6-cfcc-4653-9f95-29b37d50f886]
 Start sending messages  : wed mar 29 23:20:53 cst 2017
SimpleAsyncTaskExecutor-1  Receive from hello.queue1 Messages for queues: Wed Mar 29 23:20:53 CST 2017
SimpleAsyncTaskExecutor-2  Receive from hello.queue2 Messages for queues: Wed Mar 29 23:20:53 CST 2017
 End sending message  : wed mar 29 23:20:53 cst 2017
 Consumer response  : WED MAR 29 23:20:53 CST 2017  Message processing completed 
------------------------------------------------
 Message sent successfully :CorrelationData [id=e4e01f89-d0d4-405e-80f0-85bb20238f34]
 Start sending messages  : wed mar 29 23:20:54 cst 2017
SimpleAsyncTaskExecutor-2  Receive from hello.queue1 Messages for queues: Wed Mar 29 23:20:54 CST 2017
SimpleAsyncTaskExecutor-1  Receive from hello.queue2 Messages for queues: Wed Mar 29 23:20:54 CST 2017
 End sending message  : wed mar 29 23:20:54 cst 2017
 Consumer response  : WED MAR 29 23:20:54 CST 2017  Message processing completed 
------------------------------------------------

If you need to use other types of switches, has provided spring implementation, all switches to realize org. springframework. amqp. core. AbstractExchange interface.

Common exchange types are as follows:

Direct(DirectExchange) : the behavior of direct type is "match first, then deliver ". That is, when one routing_key is set during the binding, and the routing_key of the message is completely matched, it will be delivered to the binding queue by the exchange.

Topic(TopicExchange) : Forward messages by rule (most flexible).

Headers(HeadersExchange) : Sets the switch for the header attribute parameter type.

Fanout(FanoutExchange) : Forwards messages to all binding queues.


Related articles: