PHP+RabbitMQ Complete code for message queuing
- 2021-12-04 09:36:46
- OfStack
Preface
Why use RabbitMq instead of ActiveMq or RocketMq?
First of all, I do not require 100% message acceptance from a business perspective, and I need to develop in conjunction with php, which has lower latency (subtlety level) than RocketMq. As for ActiveMq, it seems that there are many problems. RabbitMq has good support for various languages, so RabbitMq is chosen.
Install RabbitMQ corresponding to PHP first. Here, php_amqp is used. Different extension implementations will have slight differences.
Extended address of php: http://pecl.php. net/package/amqp
Specifically, official website shall prevail http://www.rabbitmq.com/getstarted.html
Introduction
config. php configuration information BaseMQ. php MQ base class ProductMQ. php producer class ConsumerMQ. php Consumer Category Consumer2MQ. php Consumer 2 (multiple available)
config.php
<?php
return [
// Configure
'host' => [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/',
],
// Switch
'exchange'=>'word',
// Route
'routes' => [],
];
BaseMQ.php
<?php
/**
* Created by PhpStorm.
* User: pc
* Date: 2018/12/13
* Time: 14:11
*/
namespace MyObjSummary\rabbitMQ;
/** Member
* AMQPChannel
* AMQPConnection
* AMQPEnvelope
* AMQPExchange
* AMQPQueue
* Class BaseMQ
* @package MyObjSummary\rabbitMQ
*/
class BaseMQ
{
/** MQ Channel
* @var \AMQPChannel
*/
public $AMQPChannel ;
/** MQ Link
* @var \AMQPConnection
*/
public $AMQPConnection ;
/** MQ Envelope
* @var \AMQPEnvelope
*/
public $AMQPEnvelope ;
/** MQ Exchange
* @var \AMQPExchange
*/
public $AMQPExchange ;
/** MQ Queue
* @var \AMQPQueue
*/
public $AMQPQueue ;
/** conf
* @var
*/
public $conf ;
/** exchange
* @var
*/
public $exchange ;
/** link
* BaseMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
$conf = require 'config.php' ;
if(!$conf)
throw new \AMQPConnectionException('config error!');
$this->conf = $conf['host'] ;
$this->exchange = $conf['exchange'] ;
$this->AMQPConnection = new \AMQPConnection($this->conf);
if (!$this->AMQPConnection->connect())
throw new \AMQPConnectionException("Cannot connect to the broker!\n");
}
/**
* close link
*/
public function close()
{
$this->AMQPConnection->disconnect();
}
/** Channel
* @return \AMQPChannel
* @throws \AMQPConnectionException
*/
public function channel()
{
if(!$this->AMQPChannel) {
$this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
}
return $this->AMQPChannel;
}
/** Exchange
* @return \AMQPExchange
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function exchange()
{
if(!$this->AMQPExchange) {
$this->AMQPExchange = new \AMQPExchange($this->channel());
$this->AMQPExchange->setName($this->exchange);
}
return $this->AMQPExchange ;
}
/** queue
* @return \AMQPQueue
* @throws \AMQPConnectionException
* @throws \AMQPQueueException
*/
public function queue()
{
if(!$this->AMQPQueue) {
$this->AMQPQueue = new \AMQPQueue($this->channel());
}
return $this->AMQPQueue ;
}
/** Envelope
* @return \AMQPEnvelope
*/
public function envelope()
{
if(!$this->AMQPEnvelope) {
$this->AMQPEnvelope = new \AMQPEnvelope();
}
return $this->AMQPEnvelope;
}
}
ProductMQ.php
<?php
// Producer P
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ProductMQ extends BaseMQ
{
private $routes = ['hello','word']; // Route key
/**
* ProductMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
parent::__construct();
}
/** Only control the success of sending Do not accept whether consumers receive it or not
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function run()
{
// Channel
$channel = $this->channel();
// Create Switch Objects
$ex = $this->exchange();
// Message content
$message = 'product message '.rand(1,99999);
// Start a transaction
$channel->startTransaction();
$sendEd = true ;
foreach ($this->routes as $route) {
$sendEd = $ex->publish($message, $route) ;
echo "Send Message:".$sendEd."\n";
}
if(!$sendEd) {
$channel->rollbackTransaction();
}
$channel->commitTransaction(); // Commit transaction
$this->close();
die ;
}
}
try{
(new ProductMQ())->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}
ConsumerMQ.php
<?php
// Consumer C
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
{
private $q_name = 'hello'; // Queue name
private $route = 'hello'; // Route key
/**
* ConsumerMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
parent::__construct();
}
/** Accept a message If terminated There will be a message when reconnecting
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
* @throws \AMQPQueueException
*/
public function run()
{
// Create a switch
$ex = $this->exchange();
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct Type
$ex->setFlags(AMQP_DURABLE); // Persistence
//echo "Exchange Status:".$ex->declare()."\n";
// Create Queue
$q = $this->queue();
//var_dump($q->declare());exit();
$q->setName($this->q_name);
$q->setFlags(AMQP_DURABLE); // Persistence
//echo "Message Total:".$q->declareQueue()."\n";
// Bind switches to queues and specify routing keys
echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
// Receive messages in blocking mode
echo "Message:\n";
while(True){
$q->consume(function ($envelope,$queue){
$msg = $envelope->getBody();
echo $msg."\n"; // Processing messages
$queue->ack($envelope->getDeliveryTag()); // Send manually ACK Answer
});
//$q->consume('processMessage', AMQP_AUTOACK); // Automatic ACK Answer
}
$this->close();
}
}
try{
(new ConsumerMQ)->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}
Summarize