PHP+RabbitMQ Complete code for message queuing

  • 2021-12-04 09:36:46
  • OfStack

Preface

Why use RabbitMq instead of ActiveMq or RocketMq?

First of all, I do not require 100% message acceptance from a business perspective, and I need to develop in conjunction with php, which has lower latency (subtlety level) than RocketMq. As for ActiveMq, it seems that there are many problems. RabbitMq has good support for various languages, so RabbitMq is chosen.

Install RabbitMQ corresponding to PHP first. Here, php_amqp is used. Different extension implementations will have slight differences.

Extended address of php: http://pecl.php. net/package/amqp

Specifically, official website shall prevail http://www.rabbitmq.com/getstarted.html

Introduction

config. php configuration information BaseMQ. php MQ base class ProductMQ. php producer class ConsumerMQ. php Consumer Category Consumer2MQ. php Consumer 2 (multiple available)

config.php


 <?php
 return [
  // Configure 
  'host' => [
   'host' => '127.0.0.1',
   'port' => '5672',
   'login' => 'guest',
   'password' => 'guest',
   'vhost'=>'/',
  ],
  // Switch 
  'exchange'=>'word',
  // Route 
  'routes' => [],
 ];

BaseMQ.php


 <?php
 /**
  * Created by PhpStorm.
  * User: pc
  * Date: 2018/12/13
  * Time: 14:11
  */
 
 namespace MyObjSummary\rabbitMQ;
 
 /** Member
  *  AMQPChannel
  *  AMQPConnection
  *  AMQPEnvelope
  *  AMQPExchange
  *  AMQPQueue
  * Class BaseMQ
  * @package MyObjSummary\rabbitMQ
  */
 class BaseMQ
 {
  /** MQ Channel
   * @var \AMQPChannel
   */
  public $AMQPChannel ;
 
  /** MQ Link
   * @var \AMQPConnection
   */
  public $AMQPConnection ;
 
  /** MQ Envelope
   * @var \AMQPEnvelope
   */
  public $AMQPEnvelope ;
 
  /** MQ Exchange
   * @var \AMQPExchange
   */
  public $AMQPExchange ;
 
  /** MQ Queue
   * @var \AMQPQueue
   */
  public $AMQPQueue ;
 
  /** conf
   * @var
   */
  public $conf ;
 
  /** exchange
   * @var
   */
  public $exchange ;
 
  /** link
   * BaseMQ constructor.
   * @throws \AMQPConnectionException
   */
  public function __construct()
  {
   $conf = require 'config.php' ;
   if(!$conf)
    throw new \AMQPConnectionException('config error!');
   $this->conf  = $conf['host'] ;
   $this->exchange = $conf['exchange'] ;
   $this->AMQPConnection = new \AMQPConnection($this->conf);
   if (!$this->AMQPConnection->connect())
    throw new \AMQPConnectionException("Cannot connect to the broker!\n");
  }
 
  /**
   * close link
   */
  public function close()
  {
   $this->AMQPConnection->disconnect();
  }
 
  /** Channel
   * @return \AMQPChannel
   * @throws \AMQPConnectionException
   */
  public function channel()
  {
   if(!$this->AMQPChannel) {
    $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
   }
   return $this->AMQPChannel;
  }
 
  /** Exchange
   * @return \AMQPExchange
   * @throws \AMQPConnectionException
   * @throws \AMQPExchangeException
   */
  public function exchange()
  {
   if(!$this->AMQPExchange) {
    $this->AMQPExchange = new \AMQPExchange($this->channel());
    $this->AMQPExchange->setName($this->exchange);
   }
   return $this->AMQPExchange ;
  }
 
  /** queue
   * @return \AMQPQueue
   * @throws \AMQPConnectionException
   * @throws \AMQPQueueException
   */
  public function queue()
  {
   if(!$this->AMQPQueue) {
    $this->AMQPQueue = new \AMQPQueue($this->channel());
   }
   return $this->AMQPQueue ;
  }
 
  /** Envelope
   * @return \AMQPEnvelope
   */
  public function envelope()
  {
   if(!$this->AMQPEnvelope) {
    $this->AMQPEnvelope = new \AMQPEnvelope();
   }
   return $this->AMQPEnvelope;
  }
 }

ProductMQ.php


 <?php
 // Producer  P
 namespace MyObjSummary\rabbitMQ;
 require 'BaseMQ.php';
 class ProductMQ extends BaseMQ
 {
  private $routes = ['hello','word']; // Route key
 
  /**
   * ProductMQ constructor.
   * @throws \AMQPConnectionException
   */
  public function __construct()
  {
   parent::__construct();
  }
 
  /**  Only control the success of sending   Do not accept whether consumers receive it or not 
   * @throws \AMQPChannelException
   * @throws \AMQPConnectionException
   * @throws \AMQPExchangeException
   */
  public function run()
  {
   // Channel 
   $channel = $this->channel();
   // Create Switch Objects 
   $ex = $this->exchange();
   // Message content 
   $message = 'product message '.rand(1,99999);
   // Start a transaction 
   $channel->startTransaction();
   $sendEd = true ;
   foreach ($this->routes as $route) {
    $sendEd = $ex->publish($message, $route) ;
    echo "Send Message:".$sendEd."\n";
   }
   if(!$sendEd) {
    $channel->rollbackTransaction();
   }
   $channel->commitTransaction(); // Commit transaction 
   $this->close();
   die ;
  }
 }
 try{
  (new ProductMQ())->run();
 }catch (\Exception $exception){
  var_dump($exception->getMessage()) ;
 }

ConsumerMQ.php


 <?php
 // Consumer  C
 namespace MyObjSummary\rabbitMQ;
 require 'BaseMQ.php';
 class ConsumerMQ extends BaseMQ
 {
  private $q_name = 'hello'; // Queue name 
  private $route = 'hello'; // Route key
 
  /**
   * ConsumerMQ constructor.
   * @throws \AMQPConnectionException
   */
  public function __construct()
  {
   parent::__construct();
  }
 
  /**  Accept a message   If terminated   There will be a message when reconnecting 
   * @throws \AMQPChannelException
   * @throws \AMQPConnectionException
   * @throws \AMQPExchangeException
   * @throws \AMQPQueueException
   */
  public function run()
  {
 
   // Create a switch 
   $ex = $this->exchange();
   $ex->setType(AMQP_EX_TYPE_DIRECT); //direct Type 
   $ex->setFlags(AMQP_DURABLE); // Persistence 
   //echo "Exchange Status:".$ex->declare()."\n";
 
   // Create Queue 
   $q = $this->queue();
   //var_dump($q->declare());exit();
   $q->setName($this->q_name);
   $q->setFlags(AMQP_DURABLE); // Persistence 
   //echo "Message Total:".$q->declareQueue()."\n";
 
   // Bind switches to queues and specify routing keys 
   echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
 
   // Receive messages in blocking mode 
   echo "Message:\n";
   while(True){
    $q->consume(function ($envelope,$queue){
     $msg = $envelope->getBody();
     echo $msg."\n"; // Processing messages 
     $queue->ack($envelope->getDeliveryTag()); // Send manually ACK Answer 
    });
    //$q->consume('processMessage', AMQP_AUTOACK); // Automatic ACK Answer 
   }
   $this->close();
  }
 }
 try{
  (new ConsumerMQ)->run();
 }catch (\Exception $exception){
  var_dump($exception->getMessage()) ;
 }

Summarize


Related articles: