python Example code for implementing RabbitMQ message queues
- 2021-01-25 07:49:59
- OfStack
Recently in the study of redis do message queue, I took a look at the implementation of RabbitMQ do message queue. The following is a summary of the three exchange schema implementations in RabbitMQ, fanout, direct, and topic.
base. py:
import pika
# Gets the authentication object with the user name and password as parameters. Authentication is required when connecting remotely
credentials = pika.PlainCredentials("admin", "admin")
# BlockingConnection(): Instantiate the connection object
# ConnectionParameters(): Instantiate the link parameter object
connection = pika.BlockingConnection(pika.ConnectionParameters(
"192.168.0.102", 5672, "/", credentials))
# Create a new channel( channel )
channel = connection.channel()
fanout mode: Sends a message to queue bound to the specified exchange, and the consumer fetches data from queue, similar to broadcast mode, publish and subscribe mode.
channel.queue_bind(exchange="logs", queue=queue_name)
Code:
publisher.py:
from base import channel, connection
# The statement exchange, Don't declare queue
channel.exchange_declare(exchange="logs", exchange_type="fanout") # radio
message = "hello fanout"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
connection.close()
consumer. py:
from base import channel, connection
# The statement exchange
channel.exchange_declare(exchange="logs", exchange_type="fanout")
# Do not specify queue The name , rabbitmq It's going to be randomly assigned 1 A name , After the message processing is complete queue It will delete itself
result = channel.queue_declare(exclusive=True)
# To obtain queue The name
queue_name = result.method.queue
# The binding exchange and queue
channel.queue_bind(exchange="logs", queue=queue_name)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
direct mode: the sender is bound with 1 routing_key1, and queue is bound with several routing_key2. If key1 is equal to key2, or key1 is in key2, the message will be sent to this queue, and then the corresponding consumer will fetch data from queue.
publisher. py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
message = "hello"
channel.basic_publish(
exchange="direct_test",
routing_key="info", # The binding key
body=message
)
connection.close()
consumer01. py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# The binding of key, with publisher In the same
routing_key="info"
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
consumer02. py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# The binding of key
routing_key="error"
)
def callback(ch, method, properties, bosy):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
consumer03. py:
from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
key_list = ["info", "warning"]
for key in key_list:
channel.queue_bind(
exchange="direct_test",
queue=queue_name,
# 1 a queue Bind multiple at once key , there are 1 a key The data is received when the conditions are met
routing_key=key
)
def callback(ch, method, properties, body):
print("body:%s" % body)
channel.basic_consume(
callback,
queue=queue_name
)
channel.start_consuming()
Perform:
python producer.py
python consumer01.py
python consumer02.py
python consumer03.py
Results:
[
consumer01.py: body:b'hello'
consumer02.py No result received
consumer03.py: body:b'hello'
The topic model is not easy to understand. Here is my understanding:
For sender bound routing_key1, queue is bound to several routing_key2; If routing_key1 satisfies any routing_key2, the message will be sent to queue via exchange, and then extracted from queue by the receiver, which is essentially an extension of direct.
Binding mode:
Sending side binding:
channel.basic_publish(
exchange="topic_logs",
routing_key=routing_key,
body=message
)
Receiver binding:
channel.queue_bind(
exchange="topic_logs",
queue=queue_name,
routing_key=binding_key
)
publisher. py:
from base import channel, connection
# The statement exchange, Don't declare queue
channel.exchange_declare(exchange="logs", exchange_type="fanout") # radio
message = "hello fanout"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
connection.close()
0
consumer01.py:
from base import channel, connection
# The statement exchange, Don't declare queue
channel.exchange_declare(exchange="logs", exchange_type="fanout") # radio
message = "hello fanout"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
connection.close()
1
consumer02.py:
from base import channel, connection
# The statement exchange, Don't declare queue
channel.exchange_declare(exchange="logs", exchange_type="fanout") # radio
message = "hello fanout"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
connection.close()
2
Perform:
python publisher02.py "this is a topic test"
python consumer01.py
python consumer02.py
Results:
[
body:b'this is a topic test' consumer01.py :b'this is a topic
consumer02.py :b'this is a topic test'
Both consumers received the message by binding the corresponding routing_key
Change routing_key from publisher.py to "mysql.info"
Then do this:
from base import channel, connection
# The statement exchange, Don't declare queue
channel.exchange_declare(exchange="logs", exchange_type="fanout") # radio
message = "hello fanout"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
connection.close()
4
Results:
[
consumer01.py results not received
body:b'this is a topic test' body:b'this is a topic test'
From this example, we can see how topic works.
Reference since: https: / / www ofstack. com article / 150386. htm