SpringBoot integrates RabbitMQ and produces advanced features of the whole scene
- 2021-11-29 23:46:11
- OfStack
Catalog Summary
Integration
Dependencies and configurations
Producer Configuration Message Queuing Rules
Producer releases message
Consumers listen to messages
Summary
Integration
Dependencies and configurations
Producer Configuration Message Queuing Rules
Producer releases message
Consumers listen to messages
Producer Configuration Message Queuing Rules
Producer releases message
Consumers listen to messages
Summary
The integration scenario includes topic working mode (routingKey can satisfy four working modes, such as simple/work queue/publish subscription/route), confirm (message confirmation), return (message return), basicAck (message receipt), basicNack (reject receipt), DLX (Dead Letter Exchange dead letter queue) to realize delayed/timed tasks, etc.
Integration
Dependencies and configurations
The following content consumers and producers
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
server.port=8090
spring.rabbitmq.host=192.168.168.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=zheng123
spring.rabbitmq.password=zheng123
spring.rabbitmq.virtual-host=/zheng
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.direct.acknowledge-mode=manual
Producer Configuration Message Queuing Rules
Here are two configuration methods, and the first configuration is used in this integration example
@Configuration
public class TopicConfig {
// Declare queue
@Bean
public Queue topicQ1() {
return new Queue("topic_sb_mq_q1");
}
// Declare a queue and bind it to a dead-letter switch ( The return value can be written in two ways, optionally 1 All kinds are fine )
// Testing dead letters requires turning off listening in the original queue
@Bean
public Queue topicQ2() {
return QueueBuilder.durable("topic_sb_mq_q2")
.withArgument("x-dead-letter-exchange", "topicExchange")
.withArgument("x-dead-letter-routing-key", "changsha.f")
.withArgument("x-message-ttl", 10000)
.build();
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange","topicExchange");
arguments.put("x-dead-letter-routing-key","changsha.f");
arguments.put("x-message-ttl",10000);
return new Queue("topic_sb_mq_q2",true,false,false,arguments);
}
// Declaration exchange
@Bean
public TopicExchange setTopicExchange() {
return new TopicExchange("topicExchange");
}
// Declaration binding , need to declare 1 A routingKey
@Bean
public Binding bindTopicHebei1() {
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");
}
@Bean
public Binding bindTopicHebei2() {
return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");
}
}
@Configuration
public class RabbitMqConfig {
// Define the name of the switch
public static final String EXCHANGE_NAME = "boot_topic_exchange";
// Define the name of the queue
public static final String QUEUE_NAME = "boot_queue";
//1 Declare the switch
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2 Declare the queue
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3 The queue is bound to the switch
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
// topic Mode is compatible with broadcast mode and routing mode. with("#") A similar broadcast pattern matches all subscribers; with("boot.1") The subscriber is specified similar to routing pattern matching
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
Producer releases message
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/topicSend")
public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
// Definition confirm Callback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// confirmed
} else {
// nack-ed
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{
// return message
});
if(null == routingKey) {
routingKey="changsha.kf";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout Mode only goes to exchange Send a message in. Distribute to exchange All under queue
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
}
Consumers listen to messages
@Component
public class ConcumerReceiver {
//topic Mode
// Note that this pattern will have a priority matching principle. Such as sending routingKey=hunan.IT, That matches the hunan.*(hunan.IT,hunan.eco), After that, there will be no match *.ITd
@RabbitListener(queues="topic_sb_mq_q1")
public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException {
// Message id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// message.getBody() todosomething
// Signing message
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// Refuse to sign
// No. 1 3 Parameters: requeue Return to the queue. If set to true The message returns to the queue , broker The message is resent to the consumer
channel.basicNack(deliveryTag, true, true);
}
}
@RabbitListener(queues="topic_sb_mq_q2")
public void topicReceiveq2(String message) {
System.out.println("Topic Mode topic_sb_mq_q2 received message : " +message);
}
}