Redis implements an example of the queue principle

  • 2020-05-30 21:17:20
  • OfStack

Redis implements an example of the queue principle

Scene description:

· used to handle time-consuming requests, such as bulk sending of emails. If the sending is triggered directly on the web page, the application will run out of time

· high concurrency scenario, when the number of requests increases at a certain moment, the request can be written to the queue, and the background will process the requests

· panic buying scene, first-in, first-out mode

Command:


rpush + blpop  or  lpush + brpop

rpush: push the data to the right of the list

blpop: the client blocks until the queue has a value output

Simple queue:


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();

Take 20 million items and push the json data into the goods:task queue


queueBlpop.php

//  Out of the team 
while (true) {  
//  Block setting timeout to 3 seconds   
$task = $redis->blPop(array('goods:task'), 3); 
  if ($task) { 
    $redis->rPush('goods:success:task', $task[1]);
    $task = json_decode($task[1], true);
    echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
    echo PHP_EOL; 
  } else {
    echo 'nothing' . PHP_EOL;    sleep(5);  
 }
}

Set the block time of blpop to 3 seconds, and save it to goods:success:task when there is no data in the queue to indicate the successful execution. When there is no data in the queue, the program will sleep for 10 seconds to re-check whether goods:task has data out of the queue

cli mode execution command:


php simple.phpphp queueBlpop.php

Priority queue

Ideas:

When blpop has more than one key, blpop traverses the key from left to right. Once one key pops up an element, the client immediately returns. Such as:


blpop key1 key2 key3 key4

Traversing from key1 to key4, if any key has a value, this value will pop up. If multiple key have values at the same time, key to the left will pop up first.


priority.php

//  Set the priority queue 
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  
// cid  Less than 100 Put it in a low level queue   
if ($row['cid'] < 100) {
    $redis->rPush($low, json_encode($row));  

// cid 100 to 600 Between the intermediate queues   
}elseif ($row['cid'] > 100 && $row['cid'] < 600) {
    $redis->rPush($mid, json_encode($row));
}  
// cid  Is greater than 600 Put it in the advanced queue    else {    $redis->rPush($high, json_encode($row));  }
}$redis->close();

priorityBlop.php

//  Priority queue 
$high = 'goods:high:task';
$mid = 'goods:mid:task';$low = 'goods:low:task';
//  Out of the team 
while(true){  //  The high-priority queue is on the left   
  $task = $redis->blPop(array($high, $mid, $low), 3);
  if ($task) {
    $task = json_decode($task[1], true);
    echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
    echo PHP_EOL;
  } else {
    echo 'nothing' . PHP_EOL;    sleep(5);
  }
}

The queue with high priority is placed to the left of blpop command and sorted in sequence. The blpop command will pop up the values of high, mid and low queues in sequence

cli mode execution command:


php priority.phpphp priorityBlpop.php

Delays in the queue

Ideas:

You can use 1 ordered set to save deferred tasks, member to save task content, score to save (current time + delay time). Use time as score. The program simply compares the score of task 1 of the ordered set with the current time. If the current time is less than score, it means that all tasks of the ordered set have not yet reached the execution time.


delay.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));}

Import 200, 000 tasks into an ordered set goods:delay:task, all of which are delayed until a second to 300 seconds later

delayHandle.php

while (true) {// since it is an ordered set, just judge the delay time of the first record, for example, the execution time of the first record is overdue // relative to the execution time of other tasks of the set


$rs = $redis->zRange('goods:delay:task', 0, 0, true);

// set the sleep time to 5 seconds


  if (empty($rs)) {    
        echo 'no tasks , sleep 5 seconds' . PHP_EOL;sleep(5);continue;}
   $taskJson = key($rs);  
       $delay = $rs[$taskJson];  
       $task = json_decode($taskJson, true);  
   $now = time();//  Time to perform a delayed task   
   if ($delay <= $now) {    

// lock the current task to avoid being modified by other clients when moving the delayed task to the task queue


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();
0

// move delay tasks to the task queue


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();
1

/ / releases the lock


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();
2

// delayed task not up to execution time


 $sleep = $delay - $now;    

// the maximum value is set to 2 seconds to ensure that if a new task (delay time 1 second) enters the set, it can be handled in time


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();
4

This file processes the delayed tasks in the ordered set and moves the delayed tasks to the task queue if the delayed tasks have reached their execution time


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();
5

Process tasks in the task queue

Execute the command in cli mode:


simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();
6

If you have any questions, please leave a message or come to the site community to exchange discussion, thank you for reading, hope to help you, thank you for your support of the site!


Related articles: