PHP Example Producer and Consumer Functions Based on rabbitmq Operation Classes

  • 2021-10-16 01:10:15
  • OfStack

This article illustrates the producer and consumer capabilities of PHP based on the rabbitmq operation class. Share it for your reference, as follows:

Precautions:

1. accept. php consumer code needs to be executed at the command line

2. 'username'=>'asdf' , 'password'=>'123456' Change to your own account number and password

RabbitMQCommand. php Operation Class Code


<?php
/*
 * amqp Protocol action class, which can access rabbitMQ
 *  Need to be installed first php_amqp Expand 
 */
class RabbitMQCommand{
  public $configs = array();
  // Switch name 
  public $exchange_name = '';
  // Queue name 
  public $queue_name = '';
  // Route name 
  public $route_key = '';
  /*
   *  Persistence, default True
   */
  public $durable = True;
  /*
   *  Automatic deletion 
   * exchange is deleted when all queues have finished using it
   * queue is deleted when last consumer unsubscribes
   *
   */
  public $autodelete = False;
  /*
   *  Mirror image 
   *  Mirror queue, when opened, messages will be copied between nodes, and there are master And slave Concept of 
   */
  public $mirror = False;
  private $_conn = Null;
  private $_exchange = Null;
  private $_channel = Null;
  private $_queue = Null;
  /*
   * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/')
   */
  public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') {
    $this->setConfigs($configs);
    $this->exchange_name = $exchange_name;
    $this->queue_name = $queue_name;
    $this->route_key = $route_key;
  }
  private function setConfigs($configs) {
    if (!is_array($configs)) {
      throw new Exception('configs is not array');
    }
    if (!($configs['host'] && $configs['port'] && $configs['username'] && $configs['password'])) {
      throw new Exception('configs is empty');
    }
    if (empty($configs['vhost'])) {
      $configs['vhost'] = '/';
    }
    $configs['login'] = $configs['username'];
    unset($configs['username']);
    $this->configs = $configs;
  }
  /*
   *  Set whether to persist, the default is True
   */
  public function setDurable($durable) {
    $this->durable = $durable;
  }
  /*
   *  Set whether to delete automatically 
   */
  public function setAutoDelete($autodelete) {
    $this->autodelete = $autodelete;
  }
  /*
   *  Set whether to mirror or not 
   */
  public function setMirror($mirror) {
    $this->mirror = $mirror;
  }
  /*
   *  Open amqp Connect 
   */
  private function open() {
    if (!$this->_conn) {
      try {
        $this->_conn = new AMQPConnection($this->configs);
        $this->_conn->connect();
        $this->initConnection();
      } catch (AMQPConnectionException $ex) {
        throw new Exception('cannot connection rabbitmq',500);
      }
    }
  }
  /*
   * rabbitmq Connections invariant 
   *  Reset the configuration of switches, queues, routes, and so on 
   */
  public function reset($exchange_name, $queue_name, $route_key) {
    $this->exchange_name = $exchange_name;
    $this->queue_name = $queue_name;
    $this->route_key = $route_key;
    $this->initConnection();
  }
  /*
   *  Initialization rabbit Configuration related to the connection 
   */
  private function initConnection() {
    if (empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)) {
      throw new Exception('rabbitmq exchange_name or queue_name or route_key is empty',500);
    }
    $this->_channel = new AMQPChannel($this->_conn);
    $this->_exchange = new AMQPExchange($this->_channel);
    $this->_exchange->setName($this->exchange_name);
    $this->_exchange->setType(AMQP_EX_TYPE_DIRECT);
    if ($this->durable)
      $this->_exchange->setFlags(AMQP_DURABLE);
    if ($this->autodelete)
      $this->_exchange->setFlags(AMQP_AUTODELETE);
    $this->_exchange->declare();
    $this->_queue = new AMQPQueue($this->_channel);
    $this->_queue->setName($this->queue_name);
    if ($this->durable)
      $this->_queue->setFlags(AMQP_DURABLE);
    if ($this->autodelete)
      $this->_queue->setFlags(AMQP_AUTODELETE);
    if ($this->mirror)
      $this->_queue->setArgument('x-ha-policy', 'all');
    $this->_queue->declare();
    $this->_queue->bind($this->exchange_name, $this->route_key);
  }
  public function close() {
    if ($this->_conn) {
      $this->_conn->disconnect();
    }
  }
  public function __sleep() {
    $this->close();
    return array_keys(get_object_vars($this));
  }
  public function __destruct() {
    $this->close();
  }
  /*
   *  Producer sends message 
   */
  public function send($msg) {
    $this->open();
    if(is_array($msg)){
      $msg = json_encode($msg);
    }else{
      $msg = trim(strval($msg));
    }
    return $this->_exchange->publish($msg, $this->route_key);
  }
  /*
   *  Consumer 
   * $fun_name = array($classobj,$function) or function name string
   * $autoack  Whether to answer automatically 
   *
   * function processMessage($envelope, $queue) {
      $msg = $envelope->getBody();
      echo $msg."\n"; // Processing messages 
      $queue->ack($envelope->getDeliveryTag());// Manual response 
    }
   */
  public function run($fun_name, $autoack = True){
    $this->open();
    if (!$fun_name || !$this->_queue) return False;
    while(True){
      if ($autoack) $this->_queue->consume($fun_name, AMQP_AUTOACK);
      else $this->_queue->consume($fun_name);
    }
  }
}

send. php producer code


<?php
set_time_limit(0);
include_once('RabbitMQCommand.php');
$configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');
$exchange_name = 'class-e-1';
$queue_name = 'class-q-1';
$route_key = 'class-r-1';
$ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
for($i=0;$i<=100;$i++){
  $ra->send(date('Y-m-d H:i:s',time()));
}
exit();

accept. php Consumer Code


<?php
error_reporting(0);
include_once('RabbitMQCommand.php');
$configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');
$exchange_name = 'class-e-1';
$queue_name = 'class-q-1';
$route_key = 'class-r-1';
$ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);
class A{
  function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    $envelopeID = $envelope->getDeliveryTag();
    $pid = posix_getpid();
    file_put_contents("log{$pid}.log", $msg.'|'.$envelopeID.''."\r\n",FILE_APPEND);
    $queue->ack($envelopeID);
  }
}
$a = new A();
$s = $ra->run(array($a,'processMessage'),false);

For more readers interested in PHP related contents, please check the topics of this site: "PHP Data Structure and Algorithm Tutorial", "php Programming Algorithm Summary", "php String (string) Usage Summary", "PHP Array (Array) Operation Skills Complete Book", "PHP Common Traversal Algorithms and Skills Summary" and "PHP Mathematical Operation Skills Summary"

I hope this article is helpful to everyone's PHP programming.


Related articles: