Redis implements an example of the queue principle
- 2020-05-30 21:17:20
- OfStack
Redis implements an example of the queue principle
Scene description:
· used to handle time-consuming requests, such as bulk sending of emails. If the sending is triggered directly on the web page, the application will run out of time
· high concurrency scenario, when the number of requests increases at a certain moment, the request can be written to the queue, and the background will process the requests
· panic buying scene, first-in, first-out mode
Command:
rpush + blpop or lpush + brpop
rpush: push the data to the right of the list
blpop: the client blocks until the queue has a value output
Simple queue:
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
Take 20 million items and push the json data into the goods:task queue
queueBlpop.php
// Out of the team
while (true) {
// Block setting timeout to 3 seconds
$task = $redis->blPop(array('goods:task'), 3);
if ($task) {
$redis->rPush('goods:success:task', $task[1]);
$task = json_decode($task[1], true);
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
echo PHP_EOL;
} else {
echo 'nothing' . PHP_EOL; sleep(5);
}
}
Set the block time of blpop to 3 seconds, and save it to goods:success:task when there is no data in the queue to indicate the successful execution. When there is no data in the queue, the program will sleep for 10 seconds to re-check whether goods:task has data out of the queue
cli mode execution command:
php simple.phpphp queueBlpop.php
Priority queue
Ideas:
When blpop has more than one key, blpop traverses the key from left to right. Once one key pops up an element, the client immediately returns. Such as:
blpop key1 key2 key3 key4
Traversing from key1 to key4, if any key has a value, this value will pop up. If multiple key have values at the same time, key to the left will pop up first.
priority.php
// Set the priority queue
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
// cid Less than 100 Put it in a low level queue
if ($row['cid'] < 100) {
$redis->rPush($low, json_encode($row));
// cid 100 to 600 Between the intermediate queues
}elseif ($row['cid'] > 100 && $row['cid'] < 600) {
$redis->rPush($mid, json_encode($row));
}
// cid Is greater than 600 Put it in the advanced queue else { $redis->rPush($high, json_encode($row)); }
}$redis->close();
priorityBlop.php
// Priority queue
$high = 'goods:high:task';
$mid = 'goods:mid:task';$low = 'goods:low:task';
// Out of the team
while(true){ // The high-priority queue is on the left
$task = $redis->blPop(array($high, $mid, $low), 3);
if ($task) {
$task = json_decode($task[1], true);
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
echo PHP_EOL;
} else {
echo 'nothing' . PHP_EOL; sleep(5);
}
}
The queue with high priority is placed to the left of blpop command and sorted in sequence. The blpop command will pop up the values of high, mid and low queues in sequence
cli mode execution command:
php priority.phpphp priorityBlpop.php
Delays in the queue
Ideas:
You can use 1 ordered set to save deferred tasks, member to save task content, score to save (current time + delay time). Use time as score. The program simply compares the score of task 1 of the ordered set with the current time. If the current time is less than score, it means that all tasks of the ordered set have not yet reached the execution time.
delay.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));}
Import 200, 000 tasks into an ordered set goods:delay:task, all of which are delayed until a second to 300 seconds later
delayHandle.php
while (true) {// since it is an ordered set, just judge the delay time of the first record, for example, the execution time of the first record is overdue // relative to the execution time of other tasks of the set
$rs = $redis->zRange('goods:delay:task', 0, 0, true);
// set the sleep time to 5 seconds
if (empty($rs)) {
echo 'no tasks , sleep 5 seconds' . PHP_EOL;sleep(5);continue;}
$taskJson = key($rs);
$delay = $rs[$taskJson];
$task = json_decode($taskJson, true);
$now = time();// Time to perform a delayed task
if ($delay <= $now) {
// lock the current task to avoid being modified by other clients when moving the delayed task to the task queue
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
0
// move delay tasks to the task queue
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
1
/ / releases the lock
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
2
// delayed task not up to execution time
$sleep = $delay - $now;
// the maximum value is set to 2 seconds to ensure that if a new task (delay time 1 second) enters the set, it can be handled in time
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
4
This file processes the delayed tasks in the ordered set and moves the delayed tasks to the task queue if the delayed tasks have reached their execution time
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
5
Process tasks in the task queue
Execute the command in cli mode:
simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
6
If you have any questions, please leave a message or come to the site community to exchange discussion, thank you for reading, hope to help you, thank you for your support of the site!