Methods tutorial for working with Python on message queue RabbitMQ
- 2020-06-12 09:47:15
- OfStack
preface
RabbitMQ is a complete, reusable enterprise messaging system based on AMQP. He follows the Mozilla Public License open source protocol.
MQ, fully known as Message Queue, message queue (MQ) is a method of application to application communication. Applications communicate by reading and writing messages to and from the queue (data for the application) without the need for a dedicated connection to link them. Messaging refers to communication between programs by sending data in messages, rather than by direct calls to each other, which are usually used for technologies such as remote procedure calls. Queueing means that applications communicate through queues. The use of queues eliminates the need for receiving and sending applications to execute simultaneously.
Application Scenario:
RabbitMQ is undoubtedly one of the most popular message queues and has rich support for a variety of language environments. There are about three usage scenarios for message queues:
1. System integration and distributed system design. The various subsystems were linked through messages, and this solution evolved into an architectural style, "Architecture through messaging."
2. When the synchronous processing mode in the system seriously affects the throughput, such as logging. If we need to log all the user behaviors in the system, if we log synchronously, the response speed of the system is bound to be affected. When we send log messages to the message queue, the logging subsystem will consume the log messages asynchronously.
3. High availability of the system, such as the seckill scene of e-commerce. A system outage occurs when the application server or database server receives a large number of requests at some point. If the request can be forwarded to a message queue and then consumed by the server, it will smooth the request and improve the availability of the system.
1. Installation environment
The first is to install rabbitmq on Linux
# The environment for CentOS 7
yum install rabbitmq-server # The installation RabbitMQ
systemctl start rabbitmq-server # Start the
systemctl enable rabbitmq-server # Boot from the rev.
systemctl stop firewall-cmd # Temporary shutdown of firewall
Then install the Python3 development kit with pip
pip3 install pika
After installed software can access http: / / 115. xx. xx. xx: 15672 / access to own web RabbitMQ page to view and management. The default administrator user password is guest
2. Simply add a message to the queue
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
# Create a connection object
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# Create channel objects
channel = connection.channel()
# The specified 1 Five queues, created if the queue does not exist
channel.queue_declare(queue='test_queue')
# Commit message
for i in range(10):
channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
print("sent...")
# Close the connection
connection.close()
3. Simply get the message from the queue
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# Connect to the RabbitMQ The server
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# The specified 1 Five queues, created if the queue does not exist
channel.queue_declare(queue='test_queue')
# define 1 A callback function
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# tell RabbitMQ use callback To receive information
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# It starts receiving information and goes into a blocking state until there is information in the queue callback Process. According to the ctrl+c To exit.
channel.start_consuming()
41,000 consumers went offline
Imagine a situation like this:
The consumer got n data from the message queue and was about to process it when it went down. There is one ACK in RabbieMQ that can be used to confirm the end of consumer processing. Similar to ACK in the network, the queue does not remove the data immediately after the consumer gets the data from the queue, but waits for the corresponding ACK. After the consumer gets the data and processes it, he will send an ACK package to the queue to inform RabbitMQ that the message has been processed and can be deleted. At this time, RabbitMQ will remove the data from the queue. So even if the customer drops the line, the data will still be in the queue for other customers to process.
This is implemented in Python:
The consumer has 1 line of code like this
channel.basic_consume(callback, queue='test_queue', no_ack=False)
, including
no_ack=False
Does not send a confirmation packet. Changing it to no_ack=True sends a confirmation packet to RabbitMQ after each processing to confirm that the message has been processed.
51.RabbitMQ is down
Even with the ACK package, 10,000 RabbitMQ will still lose that data. So we can set up a data persistent store for RabbitMQ. RabbitMQ persists the data to disk, ensuring that the queue is still there the next time it is started.
This is implemented in Python:
Let's declare a queue that looks like this
channel.queue_declare(queue='test_queue')
If you need to persist a queue, you can declare this
channel.queue_declare(queue='test_queue', durable=True)
. However, this line cannot be executed directly in the code because there is already a queue named test_queue. RabbitMQ does not allow the same queue to be declared differently, so a new queue name can be created to specify the data persistence store. But if that's all there is to say, the queue is still there after the RabbitMQ crash and restart, but the data in the queue is gone. Unless we declare queues like this
channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
.
6. The simplest publish-and-subscribe
The simplest publish-and-subscribe is called the Fanout pattern in RabbitMQ. In other words, subscribers subscribe to a channel, publishers publish messages to that channel, and all subscribers receive the message. However, because publishers need to use random queues created by subscribers, subscribers need to be started before publishers can be started.
Publisher code:
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
# Create a connection object
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# Create channel objects
channel = connection.channel()
# Define the switch, exchange Represents the name of the switch, type According to the type
channel.exchange_declare(exchange='my_fanout',
type='fanout')
message = 'Hello Python'
# Send the message to the switch
channel.basic_publish(exchange='my_fanout', # The specified exchange
routing_key='', # fanout No configuration is required and the configuration will not take effect
body=message)
connection.close()
Subscriber code:
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# Connect to the RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# Define the switch and proceed exchange The statement, exchange Represents the name of the switch, type According to the type
channel.exchange_declare(exchange='my_fanout',
type='fanout')
# Random queue creation
result = channel.queue_declare(exclusive=True) # exclusive=True Set up temporary queue, when consumer When closed, the queue is deleted
queue_name = result.method.queue
# The queue with exchange To bind
channel.queue_bind(exchange='my_fanout',
queue=queue_name)
# Define callback methods
def callback(ch, method, properties, body):
print(body.decode('utf-8'))
# Get information from the queue
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
conclusion