PHP+RabbitMQ Complete code for message queuing

  • 2021-12-04 09:36:46
  • OfStack


Why use RabbitMq instead of ActiveMq or RocketMq?

First of all, I do not require 100% message acceptance from a business perspective, and I need to develop in conjunction with php, which has lower latency (subtlety level) than RocketMq. As for ActiveMq, it seems that there are many problems. RabbitMq has good support for various languages, so RabbitMq is chosen.

Install RabbitMQ corresponding to PHP first. Here, php_amqp is used. Different extension implementations will have slight differences.

Extended address of php: http://pecl.php. net/package/amqp

Specifically, official website shall prevail


config. php configuration information BaseMQ. php MQ base class ProductMQ. php producer class ConsumerMQ. php Consumer Category Consumer2MQ. php Consumer 2 (multiple available)


 return [
  // Configure 
  'host' => [
   'host' => '',
   'port' => '5672',
   'login' => 'guest',
   'password' => 'guest',
  // Switch 
  // Route 
  'routes' => [],


  * Created by PhpStorm.
  * User: pc
  * Date: 2018/12/13
  * Time: 14:11
 namespace MyObjSummary\rabbitMQ;
 /** Member
  *  AMQPChannel
  *  AMQPConnection
  *  AMQPEnvelope
  *  AMQPExchange
  *  AMQPQueue
  * Class BaseMQ
  * @package MyObjSummary\rabbitMQ
 class BaseMQ
  /** MQ Channel
   * @var \AMQPChannel
  public $AMQPChannel ;
  /** MQ Link
   * @var \AMQPConnection
  public $AMQPConnection ;
  /** MQ Envelope
   * @var \AMQPEnvelope
  public $AMQPEnvelope ;
  /** MQ Exchange
   * @var \AMQPExchange
  public $AMQPExchange ;
  /** MQ Queue
   * @var \AMQPQueue
  public $AMQPQueue ;
  /** conf
   * @var
  public $conf ;
  /** exchange
   * @var
  public $exchange ;
  /** link
   * BaseMQ constructor.
   * @throws \AMQPConnectionException
  public function __construct()
   $conf = require 'config.php' ;
    throw new \AMQPConnectionException('config error!');
   $this->conf  = $conf['host'] ;
   $this->exchange = $conf['exchange'] ;
   $this->AMQPConnection = new \AMQPConnection($this->conf);
   if (!$this->AMQPConnection->connect())
    throw new \AMQPConnectionException("Cannot connect to the broker!\n");
   * close link
  public function close()
  /** Channel
   * @return \AMQPChannel
   * @throws \AMQPConnectionException
  public function channel()
   if(!$this->AMQPChannel) {
    $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
   return $this->AMQPChannel;
  /** Exchange
   * @return \AMQPExchange
   * @throws \AMQPConnectionException
   * @throws \AMQPExchangeException
  public function exchange()
   if(!$this->AMQPExchange) {
    $this->AMQPExchange = new \AMQPExchange($this->channel());
   return $this->AMQPExchange ;
  /** queue
   * @return \AMQPQueue
   * @throws \AMQPConnectionException
   * @throws \AMQPQueueException
  public function queue()
   if(!$this->AMQPQueue) {
    $this->AMQPQueue = new \AMQPQueue($this->channel());
   return $this->AMQPQueue ;
  /** Envelope
   * @return \AMQPEnvelope
  public function envelope()
   if(!$this->AMQPEnvelope) {
    $this->AMQPEnvelope = new \AMQPEnvelope();
   return $this->AMQPEnvelope;


 // Producer  P
 namespace MyObjSummary\rabbitMQ;
 require 'BaseMQ.php';
 class ProductMQ extends BaseMQ
  private $routes = ['hello','word']; // Route key
   * ProductMQ constructor.
   * @throws \AMQPConnectionException
  public function __construct()
  /**  Only control the success of sending   Do not accept whether consumers receive it or not 
   * @throws \AMQPChannelException
   * @throws \AMQPConnectionException
   * @throws \AMQPExchangeException
  public function run()
   // Channel 
   $channel = $this->channel();
   // Create Switch Objects 
   $ex = $this->exchange();
   // Message content 
   $message = 'product message '.rand(1,99999);
   // Start a transaction 
   $sendEd = true ;
   foreach ($this->routes as $route) {
    $sendEd = $ex->publish($message, $route) ;
    echo "Send Message:".$sendEd."\n";
   if(!$sendEd) {
   $channel->commitTransaction(); // Commit transaction 
   die ;
  (new ProductMQ())->run();
 }catch (\Exception $exception){
  var_dump($exception->getMessage()) ;


 // Consumer  C
 namespace MyObjSummary\rabbitMQ;
 require 'BaseMQ.php';
 class ConsumerMQ extends BaseMQ
  private $q_name = 'hello'; // Queue name 
  private $route = 'hello'; // Route key
   * ConsumerMQ constructor.
   * @throws \AMQPConnectionException
  public function __construct()
  /**  Accept a message   If terminated   There will be a message when reconnecting 
   * @throws \AMQPChannelException
   * @throws \AMQPConnectionException
   * @throws \AMQPExchangeException
   * @throws \AMQPQueueException
  public function run()
   // Create a switch 
   $ex = $this->exchange();
   $ex->setType(AMQP_EX_TYPE_DIRECT); //direct Type 
   $ex->setFlags(AMQP_DURABLE); // Persistence 
   //echo "Exchange Status:".$ex->declare()."\n";
   // Create Queue 
   $q = $this->queue();
   $q->setFlags(AMQP_DURABLE); // Persistence 
   //echo "Message Total:".$q->declareQueue()."\n";
   // Bind switches to queues and specify routing keys 
   echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
   // Receive messages in blocking mode 
   echo "Message:\n";
    $q->consume(function ($envelope,$queue){
     $msg = $envelope->getBody();
     echo $msg."\n"; // Processing messages 
     $queue->ack($envelope->getDeliveryTag()); // Send manually ACK Answer 
    //$q->consume('processMessage', AMQP_AUTOACK); // Automatic ACK Answer 
  (new ConsumerMQ)->run();
 }catch (\Exception $exception){
  var_dump($exception->getMessage()) ;


Related articles: