php Message Push Method Based on Redis Message Queue

  • 2021-11-10 09:04:34
  • OfStack

Basic knowledge points

The following command is used to realize our message push

brpop blocking mode is deleted after getting the value from the right side of the queue brpoplpush is removed from the right side of queue A and placed into queue B from the left side

Logical analysis

Write the target of push_queue queue to send messages in the ordinary task script, and set 1 content to be pushed for the target, which will never expire In RedisPushQueue, brpoplpush is processed, and the processed value is put into temp_queue, which mainly prevents the program from crashing and causing push failure RedisAutoDeleteTempqueueItems handles temp_queue, where brpop is used

Code implementation

Common task script


<?php
foreach ($user_list as $item) {
  // Naming convention   Business type _ Operation _ID_ Random 6 Bit   Value   Customize   What I customize is " Push content "
  $k_name = 'rabbit_push_' . $item['uid'].'_'.rand(100000,999999);
  $redis->lPush('push_queue',$k_name);// Left-entry queue 
  $redis->set($k_name, ' Push content ');
}

RedisPushQueue


<?php
// Message Queuing Processing Push ~
//
 //  Daemon running 
 // nohup php YOURPATH/RedisPushQueue.php &  Start the daemon to run, and start it again after modifying the file 
// blpop  Go back if you have value   Blocking without value   The main thing is that this function is working   However, it is not safe. If the program crashes during execution, the contents of the queue will be caused 
 //  Permanent loss ~
 // BRPOPLPUSH  Blocking mode   Out on the right   Left-side entry   Enter from left when filling in the queue content 
 //
ini_set('default_socket_timeout', -1); // No timeout 
require_once 'YOURPARH/Rongcloud.php';

$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
$redis->select(2);// Switch to db2
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);

// temp_queue Temporary queues prevent program crashes from causing loss of contents in queues  0 Means never timeout! 
While ($key = $redis->brpoplpush('push_queue', 'temp_queue', 0)) {
  if ($val = $redis->get($key)) {
    //rabbit_push_20_175990
    $arr = explode('_', $key);
    if (count($arr) != 4) {
      continue;
    }
    $id = $arr[2];
    push($id, $val);
    // Delete key Content 
    $redis->del($key);
  } 
}
function push($id, $v)
{
 // Push operation ~
}

RedisAutoDeleteTempqueueItems


<?php
/*  Automatic processing temp_queue Elements in, this operation is to prevent RedisPushQueue Deal with it when it crashes 
  The treatment idea is   Use brpop  Command blocking handling temp_queue The value in this queue, if you can get it " Value " Corresponding " Value " , description RedisPushQueue Execution failed 
  Return the value to lpush To push_queue In order to prepare for new processing 
  As to why it is used brpop Orders, because in RedisPushQueue We use the brpoplpush
 nohup php YOURPATH/RedisAutoDeleteTempqueueItems.php &  Start the daemon to run, and start it again after modifying the file 
*/
ini_set('default_socket_timeout', -1); // No timeout 
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
$redis->select(2);// Switch to db2
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
  while($key_arr = $redis->brPop('temp_queue',0)){
  if(count($key_arr) != 2){
    continue;
  }
  $key =$key_arr[1];
  if($redis->get($key)){// Can get the value   Description RedisPushQueue Execution failed 
    $redis->lPush('push_queue',$key);
  }
}

Related articles: