Springboot Method Steps for Configuring RabbitMQ Documents
- 2021-07-09 08:17:16
- OfStack
Brief introduction
RabbitMQ is a kind of message middleware that implements AMQP (Advanced Message Queuing Protocol). It is used to store and forward messages in distributed systems. It has good performance in ease of use, scalability and high availability
Concepts:
The producer of the producer message, which is responsible for pushing the message to the message queue The final recipient of consumer messages is responsible for monitoring the corresponding messages in the queue and consuming messages Register of queue messages, which is responsible for storing messages sent by producers Switches are responsible for distributing messages generated by producers according to 1 rule Binding completes the binding between the switch and the queue
Mode:
SpringBoot Integrated RabbitMQ
1. Introducing maven dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
2. Configure application. properties
# rabbitmq
spring.rabbitmq.host = dev-mq.a.pa.com
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtualHost = /message-test/
3. Write an AmqpConfiguration configuration file
package message.test.configuration;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfiguration {
/**
* Message encoding
*/
public static final String MESSAGE_ENCODING = "UTF-8";
public static final String EXCHANGE_ISSUE = "exchange_message_issue";
public static final String QUEUE_ISSUE_USER = "queue_message_issue_user";
public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user";
public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device";
public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city";
public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user";
public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user";
public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device";
public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city";
public static final String EXCHANGE_PUSH = "exchange_message_push";
public static final String QUEUE_PUSH_RESULT = "queue_message_push_result";
@Autowired
private RabbitProperties rabbitProperties;
@Bean
public Queue issueUserQueue() {
return new Queue(QUEUE_ISSUE_USER);
}
@Bean
public Queue issueAllUserQueue() {
return new Queue(QUEUE_ISSUE_ALL_USER);
}
@Bean
public Queue issueAllDeviceQueue() {
return new Queue(QUEUE_ISSUE_ALL_DEVICE);
}
@Bean
public Queue issueCityQueue() {
return new Queue(QUEUE_ISSUE_CITY);
}
@Bean
public Queue pushResultQueue() {
return new Queue(QUEUE_PUSH_RESULT);
}
@Bean
public DirectExchange issueExchange() {
return new DirectExchange(EXCHANGE_ISSUE);
}
@Bean
public DirectExchange pushExchange() {
// Parameter 1 : Queue
// Parameter 2 : Persistence or not
// Parameter 3 Automatically delete
return new DirectExchange(EXCHANGE_PUSH, true, true);
}
@Bean
public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);
}
@Bean
public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);
}
@Bean
public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);
}
@Bean
public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);
}
@Bean
public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue,
@Qualifier("pushExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).withQueueName();
}
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory)
{
return new RabbitTemplate(connectionFactory);
}
}
3. Writing producers
body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING);
rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE,
AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);
STEP 4 Writing Consumers
@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)
public void handlePushResult(@Payload byte[] data, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
}