Methods tutorial for working with Python on message queue RabbitMQ

  • 2020-06-12 09:47:15
  • OfStack

preface

RabbitMQ is a complete, reusable enterprise messaging system based on AMQP. He follows the Mozilla Public License open source protocol.
MQ, fully known as Message Queue, message queue (MQ) is a method of application to application communication. Applications communicate by reading and writing messages to and from the queue (data for the application) without the need for a dedicated connection to link them. Messaging refers to communication between programs by sending data in messages, rather than by direct calls to each other, which are usually used for technologies such as remote procedure calls. Queueing means that applications communicate through queues. The use of queues eliminates the need for receiving and sending applications to execute simultaneously.

Application Scenario:

RabbitMQ is undoubtedly one of the most popular message queues and has rich support for a variety of language environments. There are about three usage scenarios for message queues:

1. System integration and distributed system design. The various subsystems were linked through messages, and this solution evolved into an architectural style, "Architecture through messaging."

2. When the synchronous processing mode in the system seriously affects the throughput, such as logging. If we need to log all the user behaviors in the system, if we log synchronously, the response speed of the system is bound to be affected. When we send log messages to the message queue, the logging subsystem will consume the log messages asynchronously.

3. High availability of the system, such as the seckill scene of e-commerce. A system outage occurs when the application server or database server receives a large number of requests at some point. If the request can be forwarded to a message queue and then consumed by the server, it will smooth the request and improve the availability of the system.

1. Installation environment

The first is to install rabbitmq on Linux


#  The environment for CentOS 7
yum install rabbitmq-server #  The installation RabbitMQ
systemctl start rabbitmq-server #  Start the 
systemctl enable rabbitmq-server #  Boot from the rev. 
systemctl stop firewall-cmd  #  Temporary shutdown of firewall 

Then install the Python3 development kit with pip


pip3 install pika

After installed software can access http: / / 115. xx. xx. xx: 15672 / access to own web RabbitMQ page to view and management. The default administrator user password is guest

2. Simply add a message to the queue


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
#  Create a connection object 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
#  Create channel objects 
channel = connection.channel()
#  The specified 1 Five queues, created if the queue does not exist 
channel.queue_declare(queue='test_queue')
#  Commit message 
for i in range(10):
 channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
 print("sent...")
#  Close the connection 
connection.close()

3. Simply get the message from the queue


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
#  Connect to the RabbitMQ The server 
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
#  The specified 1 Five queues, created if the queue does not exist 
channel.queue_declare(queue='test_queue')
#  define 1 A callback function 
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
#  tell RabbitMQ use callback To receive information 
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
#  It starts receiving information and goes into a blocking state until there is information in the queue callback Process. According to the ctrl+c To exit. 
channel.start_consuming()

41,000 consumers went offline

Imagine a situation like this:

The consumer got n data from the message queue and was about to process it when it went down. There is one ACK in RabbieMQ that can be used to confirm the end of consumer processing. Similar to ACK in the network, the queue does not remove the data immediately after the consumer gets the data from the queue, but waits for the corresponding ACK. After the consumer gets the data and processes it, he will send an ACK package to the queue to inform RabbitMQ that the message has been processed and can be deleted. At this time, RabbitMQ will remove the data from the queue. So even if the customer drops the line, the data will still be in the queue for other customers to process.

This is implemented in Python:

The consumer has 1 line of code like this channel.basic_consume(callback, queue='test_queue', no_ack=False) , including no_ack=False Does not send a confirmation packet. Changing it to no_ack=True sends a confirmation packet to RabbitMQ after each processing to confirm that the message has been processed.

51.RabbitMQ is down

Even with the ACK package, 10,000 RabbitMQ will still lose that data. So we can set up a data persistent store for RabbitMQ. RabbitMQ persists the data to disk, ensuring that the queue is still there the next time it is started.

This is implemented in Python:

Let's declare a queue that looks like this channel.queue_declare(queue='test_queue') If you need to persist a queue, you can declare this channel.queue_declare(queue='test_queue', durable=True) . However, this line cannot be executed directly in the code because there is already a queue named test_queue. RabbitMQ does not allow the same queue to be declared differently, so a new queue name can be created to specify the data persistence store. But if that's all there is to say, the queue is still there after the RabbitMQ crash and restart, but the data in the queue is gone. Unless we declare queues like this channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,)) .

6. The simplest publish-and-subscribe

The simplest publish-and-subscribe is called the Fanout pattern in RabbitMQ. In other words, subscribers subscribe to a channel, publishers publish messages to that channel, and all subscribers receive the message. However, because publishers need to use random queues created by subscribers, subscribers need to be started before publishers can be started.

Publisher code:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
#  Create a connection object 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
#  Create channel objects 
channel = connection.channel()
#  Define the switch, exchange Represents the name of the switch, type According to the type 
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
message = 'Hello Python'
#  Send the message to the switch 
channel.basic_publish(exchange='my_fanout', #  The specified exchange
      routing_key='', # fanout No configuration is required and the configuration will not take effect 
      body=message)
connection.close()

Subscriber code:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
#  Connect to the RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
#  Define the switch and proceed exchange The statement, exchange Represents the name of the switch, type According to the type 
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
#  Random queue creation 
result = channel.queue_declare(exclusive=True) # exclusive=True Set up temporary queue, when consumer When closed, the queue is deleted 
queue_name = result.method.queue
#  The queue with exchange To bind 
channel.queue_bind(exchange='my_fanout',
     queue=queue_name)
#  Define callback methods 
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
#  Get information from the queue 
channel.basic_consume(callback,
      queue=queue_name,
      no_ack=True)
channel.start_consuming()

conclusion


Related articles: