Implement distributed locks and task queues based on Redis

  • 2020-05-13 03:46:59
  • OfStack

1. Introduction

Double shortly after 101, everyone knows Tmall, jingdong, su ning, and so on electricity network operators website has many seconds kill activities, for example in a 1 time for 1 of the original 1999 seconds to bargain now as long as 999 mobile phones, will usher in the peak period of a user request, there may be a few 100000 millions of concurrency, to grab the phone, in the case of the high concurrency on the database server or a file server application server caused great pressure, serious when might go down, the other one problem is that the amount of seconds kill things are, for example 1 phone only the quantity of 10 seconds to kill, so, In the case of high concurrency, there are thousands of data updates to the database (for example, if one of the 10 data sets is robbed by someone, 1 will be recorded in the data set). At that time, the order is very disorderly, and it is very easy to appear the quantity of 10 data sets, and the person who is robbed will have more than 10 serious problems. So, how can we solve the problems mentioned in the future?

The techniques I share next can be used to address the above issues: distributed locks and task queues.

2. Implementation ideas

1.Redis implements the distributed lock idea

The idea is very simple, the main redis function is setnx(), which should be the most important function to implement distributed locking. The first step is to store a task id (Lock:order is used as an example) as a key in redis and set an expiration time for it. If there is a request for Lock:order, first use setnx() to see if you can insert Lock:order into redis, return true if you can, and false if you can't. Of course, it's a little more complicated than that in my code, and I'll take it one step further as I analyze the code.

2.Redis implements the task queue

The implementation here will use the Redis distributed locking mechanism above, mainly using the 1 data structure of the ordered set in Redis. For example, add() function of zset can be used for enlisting, while getScore() function of zset can be used for enlisting. You can also pop up several tasks at the top.

That's the simple idea of implementing a distributed lock and a task queue. If you're left with a bit of ambiguity, take a look at the following code implementation.

3. Code analysis

(1) first, analyze the code implementation of Redis distributed lock

(1) to avoid special reasons cause the lock can't release, in after the success of the lock, lock will be given a time to live (through lock method of parameter setting or use the default value), beyond survival time lock is automatically releases the lock of the default shorter survival time (in seconds), as a result, if need long time lock, can pass expire method to prolong survival time for appropriate locks, such as in the loop.

(2) system-wide lock when there was a crash process whatever the reason, the operating system will recycle lock, so there will be no resources lost, but distributed locks don't have to, if one set for a long time, 1 denier due to various reasons lead to abnormal process crash or other unlock not is invoked, then the locked in the rest of the time it becomes waste, lead to reboot or other process of entering the lock area.

First, let's look at the implementation code of the lock: here, we need two main parameters, one is $timeout, and this is the waiting time for the loop to acquire the lock. During this time, 1 will try to acquire the lock until the timeout. $expire is another important parameter. This parameter refers to the maximum life time of the current lock, which is measured in seconds and must be greater than 0. If the lock is not released after the life time, the system will automatically force the release. The most important function of this parameter is explained in (1) above.

Here we get the current time, and then we get the wait timeout (which is a timestamp) when the lock fails, and what is the maximum survival time for the lock to be acquired. redis key here in this format: "Lock: lock the identity of the name", began to enter the loop here, first insert the data into the redis, use setnx () function, the function of the mean, if the key does not exist, insert the data store as the largest survival time, if the insertion is successful, the failure time of the key Settings, and put the key in $lockedName array, returns true, lock is success; Exists if the key, it is not the insert, there is 1 step strict operation, that's for the rest of the current key, if the time is less than zero, there is no set on said key survival time (key is not exist, because the front setnx automatically created) if appear this kind of situation, that is a process instance after the success of the setnx crash cause followed by expire is not call, then can be directly set expire and lock for their own use. If the wait time for lock failure is not set or the maximum wait time is exceeded, exit the loop, or continue the request after $waitIntervalUs. This is the locked whole 1 code analysis.


/**
   *  lock 
   * @param [type] $name       The identifying name of the lock 
   * @param integer $timeout     Loop the wait timeout for acquiring the lock, during which time 1 Straight attempt to acquire lock until timeout, is 0 Returns directly after a failure without waiting 
   * @param integer $expire      The maximum lifetime of the current lock ( seconds ) Must be greater than 0 , if the lock has not been released beyond the lifetime, the system will automatically force the release 
   * @param integer $waitIntervalUs  The time interval between hanging and trying again after failure to acquire the lock ( microseconds )
   * @return [type]         [description]
   */
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    // Get the current time 
    $now = time();
    // The wait timeout time when the lock failed to be acquired 
    $timeoutAt = $now + $timeout;
    // Maximum survival time for locks 
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      // will rediskey The greatest moment of existence exists redis After this point the lock will be automatically released 
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        // Set up the key Failure time 
        $this->redisString->expire($redisKey, $expireAt);
        // Place the lock flag lockedNames In the array 
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      // Returns the given in seconds key The remaining survival time 
      $ttl = $this->redisString->ttl($redisKey);

      //ttl Less than 0  said key There is no set survival time ( key It doesn't exist because of the front setnx Will be created automatically) 
      // If this happens, it is an instance of the process setnx After a successful  crash  Cause to follow expire Not called 
      // You can set it directly expire And use it for your own use 
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      /***** Loop through the request lock section *****/
      // If no wait time is set for lock failure   or   The maximum waiting time has been exceeded, then exit 
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      // every  $waitIntervalUs  After continue to   request 
      usleep($waitIntervalUs);

    }

    return false;
  }

Then look at the code analysis of unlock: unlock is much easier, pass in the parameter is the lock id, first to determine whether the lock exists, if there is, from redis through the deleteKey() function to remove the lock id.


/**
   *  unlock 
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function unlock($name) {
    // Determine if the lock exists 
    if ($this->isLocking($name)) {
      // Remove the lock 
      if ($this->redisString->deleteKey("Lock:$name")) {
        // Clear off lockedNames The lock sign inside 
        unset($this->lockedNames[$name]);
        return true;
      }
    }
    return false;
  }
 In the paste to remove all the lock method, in fact 1 One sample, one more loop through. 
/**
   *  Releases all currently acquired locks 
   * @return [type] [description]
   */
  public function unlockAll() {
    // This flag is used to indicate whether all locks have been released successfully 
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
      }
    }
    return $allSuccess;
  }

The above is the summary and sharing of the whole set of ideas and code implementation of distributed lock implementation with Redis. Here I attach the code of an implementation class. In the code, I basically annotate each line, which is convenient for everyone to understand quickly and simulate the application. For an in-depth look at the code for the entire class:


/**
 * in redis Implement a distributed lock on 
 */
class RedisLock {
  private $redisString;
  private $lockedNames = [];

  public function __construct($param = NULL) {
    $this->redisString = RedisFactory::get($param)->string;
  }

  /**
   *  lock 
   * @param [type] $name       The identifying name of the lock 
   * @param integer $timeout     Loop the wait timeout for acquiring the lock, during which time 1 Straight attempt to acquire lock until timeout, is 0 Returns directly after a failure without waiting 
   * @param integer $expire      The maximum lifetime of the current lock ( seconds ) Must be greater than 0 , if the lock has not been released beyond the lifetime, the system will automatically force the release 
   * @param integer $waitIntervalUs  The time interval between hanging and trying again after failure to acquire the lock ( microseconds )
   * @return [type]         [description]
   */
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    // Get the current time 
    $now = time();
    // The wait timeout time when the lock failed to be acquired 
    $timeoutAt = $now + $timeout;
    // Maximum survival time for locks 
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      // will rediskey The greatest moment of existence exists redis After this point the lock will be automatically released 
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        // Set up the key Failure time 
        $this->redisString->expire($redisKey, $expireAt);
        // Place the lock flag lockedNames In the array 
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      // Returns the given in seconds key The remaining survival time 
      $ttl = $this->redisString->ttl($redisKey);

      //ttl Less than 0  said key There is no set survival time ( key It doesn't exist because of the front setnx Will be created automatically) 
      // If this happens, it is an instance of the process setnx After a successful  crash  Cause to follow expire Not called 
      // You can set it directly expire And use it for your own use 
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      /***** Loop through the request lock section *****/
      // If no wait time is set for lock failure   or   The maximum waiting time has been exceeded, then exit 
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      // every  $waitIntervalUs  After continue to   request 
      usleep($waitIntervalUs);

    }

    return false;
  }

  /**
   *  unlock 
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function unlock($name) {
    // Determine if the lock exists 
    if ($this->isLocking($name)) {
      // Remove the lock 
      if ($this->redisString->deleteKey("Lock:$name")) {
        // Clear off lockedNames The lock sign inside 
        unset($this->lockedNames[$name]);
        return true;
      }
    }
    return false;
  }

  /**
   *  Releases all currently acquired locks 
   * @return [type] [description]
   */
  public function unlockAll() {
    // This flag is used to indicate whether all locks have been released successfully 
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
      }
    }
    return $allSuccess;
  }

  /**
   *  Gives the current increment a specified lifetime, which must be greater than 0
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function expire($name, $expire) {
    // Determine if the lock exists first 
    if ($this->isLocking($name)) {
      // The specified survival time must be greater than 0
      $expire = max($expire, 1);
      // Increased lock lifetime 
      if ($this->redisString->expire("Lock:$name", $expire)) {
        return true;
      }
    }
    return false;
  }

  /**
   *  Determines whether a place currently has the specified name 
   * @param [type] $name [description]
   * @return boolean    [description]
   */
  public function isLocking($name) {
    // See first lonkedName[$name] Does the lock flag name exist 
    if (isset($this->lockedNames[$name])) {
      // from redis Returns the lifetime of the lock 
      return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");
    }

    return false;
  }

}

(2) implement code analysis of task queue with Redis

(1) task queue, which is used to put the operations that can be processed asynchronously in the business logic into the queue, and then get out of the queue after processing in other threads

(2) the distributed lock and other logic are used in the queue to ensure the 1-uniqueness of entry and exit

(3) this queue is different from the normal queue. The id when joining the queue is used to distinguish the repeated entry. There will only be one record in the queue, and the same as one id backentry is used to cover the forward entry, instead of the addition

Read the code analysis team: first of all, of course, is to test the legality of parameters, and then use the above content of the locking mechanism, is to lock, the team when I choose the current timestamp as score here, then there is the team, using the add zset data structures () method, after the completion of the team, to unlock the task, which finished the operation of a team.


/**
   *  The team 1 a  Task
   * @param [type] $name      The name of the queue 
   * @param [type] $id       task id (or its array) 
   * @param integer $timeout     Overtime time of enrolling ( seconds )
   * @param integer $afterInterval [description]
   * @return [type]         [description]
   */
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    // Legitimacy test 
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    // lock 
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    }
    
    // The current time stamp is used when joining the team  score
    $score = microtime(true) + $afterInterval;
    // The team 
    foreach ((array)$id as $item) {
      // Let's see if it already exists id the 
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);
      }
    }
    
    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

If $score matches in the queue, then it will be out of the queue. Otherwise, it will be considered that the Task has been re-queued, and the current operation will be treated as failure. First and legitimacy to detect the parameters, and then use the function of the lock, and then out of the team in time, first using getScore () to get the id from the Redis score, then put the incoming $score and stored in the Redis score comparisons, if both are equal for the team operation, which is used in zset delete id () method to delete this task, the final current is unlocked. This is the code analysis out of the team.


/**
   *  Out of the team 1 a Task , need to specify $id  and  $score
   *  if $score  Matches in the queue and goes out of the queue, otherwise considered Task Has been reenlisted, the current operation will be treated as a failure 
   * 
   * @param [type] $name   The name of the queue  
   * @param [type] $id    Task identity 
   * @param [type] $score   Tasks corresponding score Is returned when a task is retrieved from the queue 1 a score , only $score When it matches a value in the queue Task To be kicked out of the team 
   * @param integer $timeout  timeout ( seconds )
   * @return [type]      Task Successful or not, return false May be redis The operation failed, or it could be $score Does not match the value in the queue (this represents the Task It has been queued by other threads since it got the local.) 
   */
  public function dequeue($name, $id, $score, $timeout = 10) {
    // Legitimacy test 
    if (empty($name) || empty($id) || empty($score)) return false;
    
    // lock 
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    }
    
    // Out of the team 
    // First the redis the score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    // Judge what comes in first score and redis the score Whether it is 1 sample 
    if ($serverScore == $score) {
      // Delete the $id
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
      }
    }
    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return $result;
  }

Studied data structure's friends should know, this course queue operations and pop up the top of a value method and so on, processing operations team out of the team here, and I also implements the queue at the top of several Task and gives the method of team, would like to know friends can look at the code, if see not too clear message, here I am no longer to analyze it.


/**
   *  Gets several at the top of the queue Task  And take it out of the line 
   * @param [type] $name   The name of the queue 
   * @param integer $count   The number of 
   * @param integer $timeout  timeout 
   * @return [type]       Returns an array of [0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function pop($name, $count = 1, $timeout = 10) {
    // Legitimacy test 
    if (empty($name) || $count <= 0) return []; 
    
    // lock 
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    }
    
    // Take out a number of Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    // Put it in $result In the array   and   Delete the redis The corresponding id
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);
    }

    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
  }

The above is the summary and sharing of the whole set of thoughts and code implementation of task queue with Redis. Here, I attach the code of an implementation class. In the code, I basically annotate each line, which is convenient for everyone to understand quickly and simulate the application. For an in-depth look at the code for the entire class:


/**
 *  Task queue 
 * 
 */
class RedisQueue {
  private $_redis;

  public function __construct($param = null) {
    $this->_redis = RedisFactory::get($param);
  }

  /**
   *  The team 1 a  Task
   * @param [type] $name      The name of the queue 
   * @param [type] $id       task id (or its array) 
   * @param integer $timeout     Overtime time of enrolling ( seconds )
   * @param integer $afterInterval [description]
   * @return [type]         [description]
   */
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    // Legitimacy test 
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    // lock 
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    }
    
    // The current time stamp is used when joining the team  score
    $score = microtime(true) + $afterInterval;
    // The team 
    foreach ((array)$id as $item) {
      // Let's see if it already exists id the 
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);
      }
    }
    
    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  /**
   *  Out of the team 1 a Task , need to specify $id  and  $score
   *  if $score  Matches in the queue and goes out of the queue, otherwise considered Task Has been reenlisted, the current operation will be treated as a failure 
   * 
   * @param [type] $name   The name of the queue  
   * @param [type] $id    Task identity 
   * @param [type] $score   Tasks corresponding score Is returned when a task is retrieved from the queue 1 a score , only $score When it matches a value in the queue Task To be kicked out of the team 
   * @param integer $timeout  timeout ( seconds )
   * @return [type]      Task Successful or not, return false May be redis The operation failed, or it could be $score Does not match the value in the queue (this represents the Task It has been queued by other threads since it got the local.) 
   */
  public function dequeue($name, $id, $score, $timeout = 10) {
    // Legitimacy test 
    if (empty($name) || empty($id) || empty($score)) return false;
    
    // lock 
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    }
    
    // Out of the team 
    // First the redis the score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    // Judge what comes in first score and redis the score Whether it is 1 sample 
    if ($serverScore == $score) {
      // Delete the $id
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
      }
    }
    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return $result;
  }

  /**
   *  Gets several at the top of the queue Task  And take it out of the line 
   * @param [type] $name   The name of the queue 
   * @param integer $count   The number of 
   * @param integer $timeout  timeout 
   * @return [type]       Returns an array of [0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function pop($name, $count = 1, $timeout = 10) {
    // Legitimacy test 
    if (empty($name) || $count <= 0) return []; 
    
    // lock 
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    }
    
    // Take out a number of Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    // Put it in $result In the array   and   Delete the redis The corresponding id
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);
    }

    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
  }

  /**
   *  Gets several at the top of the queue Task
   * @param [type] $name  The name of the queue 
   * @param integer $count  The number of 
   * @return [type]      Returns an array of [0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function top($name, $count = 1) {
    // Legitimacy test 
    if (empty($name) || $count < 1) return [];

    // I'm going to make some mistakes Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
    
    // will Task It's in an array 
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
    }

    // Returns an array of  
    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;    
  }
}

To this, the basic interpretation of the two big functions, for the task queue, you can write a shell script, let the server regularly run certain programs, implementation team to wait in the operation, I would not be in here to be combined with the practical application to be realized, good understanding of the realization of the function of the two big ideas can, because the code using PHP language to write, if you understand the implementation approach, you can use java or. net to realize these two functions and so on other languages. The application scene of these two functions is more than 10 minutes, especially the second kill, another one is Spring Festival rush to grab train tickets, these two are the most vivid examples. And of course there's a lot more to it, so I'm not going to list 11 here.

All right, that's the end of this summary and sharing. Finally, I attach two classes: distributed lock and task queue:


/**
 * in redis Implement a distributed lock on 
 */
class RedisLock {
  private $redisString;
  private $lockedNames = [];

  public function __construct($param = NULL) {
    $this->redisString = RedisFactory::get($param)->string;
  }

  /**
   *  lock 
   * @param [type] $name       The identifying name of the lock 
   * @param integer $timeout     Loop the wait timeout for acquiring the lock, during which time 1 Straight attempt to acquire lock until timeout, is 0 Returns directly after a failure without waiting 
   * @param integer $expire      The maximum lifetime of the current lock ( seconds ) Must be greater than 0 , if the lock has not been released beyond the lifetime, the system will automatically force the release 
   * @param integer $waitIntervalUs  The time interval between hanging and trying again after failure to acquire the lock ( microseconds )
   * @return [type]         [description]
   */
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    // Get the current time 
    $now = time();
    // The wait timeout time when the lock failed to be acquired 
    $timeoutAt = $now + $timeout;
    // Maximum survival time for locks 
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      // will rediskey The greatest moment of existence exists redis After this point the lock will be automatically released 
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        // Set up the key Failure time 
        $this->redisString->expire($redisKey, $expireAt);
        // Place the lock flag lockedNames In the array 
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      // Returns the given in seconds key The remaining survival time 
      $ttl = $this->redisString->ttl($redisKey);

      //ttl Less than 0  said key There is no set survival time ( key It doesn't exist because of the front setnx Will be created automatically) 
      // If this happens, it is an instance of the process setnx After a successful  crash  Cause to follow expire Not called 
      // You can set it directly expire And use it for your own use 
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      /***** Loop through the request lock section *****/
      // If no wait time is set for lock failure   or   The maximum waiting time has been exceeded, then exit 
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      // every  $waitIntervalUs  After continue to   request 
      usleep($waitIntervalUs);

    }

    return false;
  }

  /**
   *  unlock 
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function unlock($name) {
    // Determine if the lock exists 
    if ($this->isLocking($name)) {
      // Remove the lock 
      if ($this->redisString->deleteKey("Lock:$name")) {
        // Clear off lockedNames The lock sign inside 
        unset($this->lockedNames[$name]);
        return true;
      }
    }
    return false;
  }

  /**
   *  Releases all currently acquired locks 
   * @return [type] [description]
   */
  public function unlockAll() {
    // This flag is used to indicate whether all locks have been released successfully 
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
      }
    }
    return $allSuccess;
  }

  /**
   *  Gives the current increment a specified lifetime, which must be greater than 0
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function expire($name, $expire) {
    // Determine if the lock exists first 
    if ($this->isLocking($name)) {
      // The specified survival time must be greater than 0
      $expire = max($expire, 1);
      // Increased lock lifetime 
      if ($this->redisString->expire("Lock:$name", $expire)) {
        return true;
      }
    }
    return false;
  }

  /**
   *  Determines whether a place currently has the specified name 
   * @param [type] $name [description]
   * @return boolean    [description]
   */
  public function isLocking($name) {
    // See first lonkedName[$name] Does the lock flag name exist 
    if (isset($this->lockedNames[$name])) {
      // from redis Returns the lifetime of the lock 
      return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");
    }

    return false;
  }

}

/**
 *  Task queue 
 */
class RedisQueue {
  private $_redis;

  public function __construct($param = null) {
    $this->_redis = RedisFactory::get($param);
  }

  /**
   *  The team 1 a  Task
   * @param [type] $name      The name of the queue 
   * @param [type] $id       task id (or its array) 
   * @param integer $timeout     Overtime time of enrolling ( seconds )
   * @param integer $afterInterval [description]
   * @return [type]         [description]
   */
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    // Legitimacy test 
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    // lock 
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    }
    
    // The current time stamp is used when joining the team  score
    $score = microtime(true) + $afterInterval;
    // The team 
    foreach ((array)$id as $item) {
      // Let's see if it already exists id the 
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);
      }
    }
    
    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  /**
   *  Out of the team 1 a Task , need to specify $id  and  $score
   *  if $score  Matches in the queue and goes out of the queue, otherwise considered Task Has been reenlisted, the current operation will be treated as a failure 
   * 
   * @param [type] $name   The name of the queue  
   * @param [type] $id    Task identity 
   * @param [type] $score   Tasks corresponding score Is returned when a task is retrieved from the queue 1 a score , only $score When it matches a value in the queue Task To be kicked out of the team 
   * @param integer $timeout  timeout ( seconds )
   * @return [type]      Task Successful or not, return false May be redis The operation failed, or it could be $score Does not match the value in the queue (this represents the Task It has been queued by other threads since it got the local.) 
   */
  public function dequeue($name, $id, $score, $timeout = 10) {
    // Legitimacy test 
    if (empty($name) || empty($id) || empty($score)) return false;
    
    // lock 
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    }
    
    // Out of the team 
    // First the redis the score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    // Judge what comes in first score and redis the score Whether it is 1 sample 
    if ($serverScore == $score) {
      // Delete the $id
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
      }
    }
    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return $result;
  }

  /**
   *  Gets several at the top of the queue Task  And take it out of the line 
   * @param [type] $name   The name of the queue 
   * @param integer $count   The number of 
   * @param integer $timeout  timeout 
   * @return [type]       Returns an array of [0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function pop($name, $count = 1, $timeout = 10) {
    // Legitimacy test 
    if (empty($name) || $count <= 0) return []; 
    
    // lock 
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    }
    
    // Take out a number of Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    // Put it in $result In the array   and   Delete the redis The corresponding id
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);
    }

    // unlock 
    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
  }

  /**
   *  Gets several at the top of the queue Task
   * @param [type] $name  The name of the queue 
   * @param integer $count  The number of 
   * @return [type]      Returns an array of [0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function top($name, $count = 1) {
    // Legitimacy test 
    if (empty($name) || $count < 1) return [];

    // I'm going to make some mistakes Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
    
    // will Task It's in an array 
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
    }

    // Returns an array of  
    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;    
  }
}

The above is the entire content of this article, I hope to help you with your study.


Related articles: