Second killing function based on redis distributed lock

  • 2020-06-07 05:33:11
  • OfStack

Recently, I encountered a business scenario similar to "seckill" in a project. In this blog, I will use a very simple demo to illustrate the basic idea of implementing the so-called "seckill".

The business scenario

The so-called seckill, from the perspective of business, is a short period of time for multiple users to "scramble" for resources, resources here in most of the seckill scenes are commodities; From the perspective of business abstraction and technology, seckilling is the operation of multiple threads on resources. Therefore, to realize seckilling, it is necessary to control the contention of threads on resources to ensure both efficient concurrency and correct operation.

1 Some possible implementations

As mentioned earlier, the key to realizing seckill is to control the contention of threads for resources. Based on basic thread knowledge, you can think of the following methods without thinking:
1. The technical abstraction of seckill should be 1 method. In this method, the possible operation is to stock the goods -1, add the goods to the user's shopping cart and so on. The simplest and most straightforward implementation is to add the synchronized keyword to this method, which in layman's terms locks the entire method.
2. Lock the whole method This strategy is simple and convenient, but it seems a little rough. You can optimize 1 slightly to lock only blocks of code that kill seconds, such as the part that writes to the database.
3, since there is a concurrency problem, then I let him "not concurrent", all threads with a queue management, make it become serial operation, naturally there will be no concurrency problem.

All of the above methods are effective, but none of them are good. Why is that? Methods 1 and 2 are essentially "locking," but lock granularity is still relatively high. What do you mean? In the first scenario, if two threads execute the seckill method at the same time, the two threads are operating on different commodities, so it should be possible to simultaneously perform the operation from the business perspective. However, if the 12th method is adopted, the two threads will also fight for the same lock, which is actually unnecessary. The third approach does not solve the problem mentioned above.

So how do you control locks at a finer level of granularity? Consider setting a mutex for each item, identified only by the string associated with the item ID, so that only threads competing for the same item are mutex exclusive, not all threads mutex exclusive. Distributed locking can help us solve this problem.

What is distributed locking

Distributed locking is a way to control synchronous access to Shared resources between distributed systems. In distributed systems, it is often necessary to coordinate their actions. If one or a set of resources are Shared between different systems or hosts of the same system, mutual exclusion is often needed to prevent interference between each other to ensure the 1 consistency when accessing these resources. In this case, distributed locks are needed.

Let's assume the simplest scenario of seckilling: there is a table in the database. column is the inventory of goods ID and ID respectively. The inventory of goods ID will be -1 if the seckilling is successful. Now let's say I have 1000 threads killing 2 items, 500 threads killing the first item, and 500 threads killing the second item. Let's explain 1 distributed lock in terms of this simple business scenario.
In general, business systems with seckill scenarios are complex, carrying a huge amount of traffic and a high amount of concurrency. Such systems often adopt a distributed architecture to balance the load. The 1000 concurrences will come from different places, the inventory of goods will be the Shared resource, and the 1000 concurrences will compete for resources, so we need to manage the concurrency mutex. This is the application of distributed locking.
key-value storage systems, such as redis, are important tools for distributed locking because of their 1 features.

Concrete implementation

Let's start with some basic redis commands:
SETNX key value
If key does not exist, set key to the corresponding string value. In this case, the command looks like SET1. When key already exists, nothing is done. SETNX is "SET if Not eXists".
expire KEY seconds
Set the expiration time for key. If key has expired, it will be deleted automatically.
del KEY
Delete key
Since the author's implementation only used these three commands, I will only introduce these three commands, more commands and the features and use of redis, you can refer to the redis website.

Questions to consider

1. What is redis used? Thanks to redis, jedis has provided the jedis client for the java application. Call jedis API directly.
2. How to realize locking? "Lock" is actually an abstract concept, and to turn this abstract concept into something concrete, is an ES75en-ES76en pair stored in redis. key is the only string associated with the commodity ID to identify the 1. value is not important, because as long as the unique key-ES81en exists, the commodity is already locked.
3. How to release the lock? Since the presence of key-ES84en pairs means that the lock is locked, releasing the lock is naturally removing the ES86en-ES87en pair from redis.
4. Blocked or non-blocked? The author USES a blocking implementation in which if a thread finds that it is locked, it polls the lock for a specific period of time.
5. How to handle abnormal situations? Such as a thread to a commodity on the lock, but due to various reasons, there is no complete operations (in the above business scenario is that there is no written to the database will inventory - 1), natural no lock is released, the author joined the lock timeout mechanism, this situation using redis expire command to key set the timeout value, after a timeout redis would this key automatically deleted, or forced to release the lock (can be thought of as a timeout releases the lock is an asynchronous operation, performed by redis, application you just need to according to the system characteristic set the timeout).

talk is cheap,show me the code

At the code implementation level, the annotation has concurrent methods and parameters, the method and parameters of the annotation are obtained through the dynamic proxy, the agent is locked, and the lock is released after executing the method of the agent.

A few annotations define:

cachelock is a method level annotation used to annotate methods that cause concurrency problems:


@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
 String lockedPrefix() default "";//redis  The lock key The prefix 
 long timeOut() default 2000;// Poll the lock for time 
 int expireTime() default 1000;//key in redis The time in existence, 1000S
}

lockedObject is a parameter-level annotation used to annotate basic types of parameters such as commodity ID:


@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
 // Don't need to value 
}

LockedComplexObject is also a parameter-level annotation used to annotate parameters of a custom type:


@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
 String field() default "";// A member variable that needs to be locked in a complex object that contains a member variable, such as 1 The commodity of commodity object ID

}

CacheLockInterceptor implements the InvocationHandler interface, obtains the method and parameters of the annotation in the invoke method, locks the method before executing the annotation, and releases the lock after executing the annotated method:


public class CacheLockInterceptor implements InvocationHandler{
 public static int ERROR_COUNT = 0;
 private Object proxied;

 public CacheLockInterceptor(Object proxied) {
 this.proxied = proxied;
 }

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

 CacheLock cacheLock = method.getAnnotation(CacheLock.class);
 // There is no cacheLock Annotations, pass
 if(null == cacheLock){
  System.out.println("no cacheLock annotation");  
  return method.invoke(proxied, args);
 }
 // Gets a comment for a parameter in a method 
 Annotation[][] annotations = method.getParameterAnnotations();
 // Gets the locked parameter based on the parameter annotation and parameter list obtained 
 Object lockedObject = getLockedObject(annotations,args);
 String objectValue = lockedObject.toString();
 // new 1 A lock 
 RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
 // lock 
 boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
 if(!result){// Take the lock failure 
  ERROR_COUNT += 1;
  throw new CacheLockException("get lock fail");

 }
 try{
  // Lock successfully. Execute method 
  return method.invoke(proxied, args);
 }finally{
  lock.unlock();// Release the lock 
 }

 }
 /**
 * 
 * @param annotations
 * @param args
 * @return
 * @throws CacheLockException
 */
 private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
 if(null == args || args.length == 0){
  throw new CacheLockException(" Method parameter is null and there is no locked object ");
 }

 if(null == annotations || annotations.length == 0){
  throw new CacheLockException(" Parameters that are not annotated ");
 }
 // Multiple parameter locking is not supported, only the 1 A note to lockedObject or lockedComplexObject The parameters of the 
 int index = -1;// Marks the position pointer of the parameter 
 for(int i = 0;i < annotations.length;i++){
  for(int j = 0;j < annotations[i].length;j++){
  if(annotations[i][j] instanceof LockedComplexObject){// Annotated as LockedComplexObject
   index = i;
   try {
   return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
   } catch (NoSuchFieldException | SecurityException e) {
   throw new CacheLockException(" There is no attribute in the annotation object " + ((LockedComplexObject)annotations[i][j]).field());
   }
  }

  if(annotations[i][j] instanceof LockedObject){
   index = i;
   break;
  }
  }
  // Find the first 1 After a direct break Multi-parameter locking is not supported 
  if(index != -1){
  break;
  }
 }

 if(index == -1){
  throw new CacheLockException(" Please specify the locked parameter ");
 }

 return args[index];
 }
}

The most critical lock and unlock methods in the RedisLock class:


/**
 *  lock 
 *  Usage: 
 * lock();
 * try{
 * executeMethod();
 * }finally{
 * unlock();
 * }
 * @param timeout timeout Polling locks within the time range 
 * @param expire  Set the lock timeout period 
 * @return  successful  or  failure 
 */
 public boolean lock(long timeout,int expire){
 long nanoTime = System.nanoTime();
 timeout *= MILLI_NANO_TIME;
 try {
  // in timeout Constantly poll locks within the time range 
  while (System.nanoTime() - nanoTime < timeout) {
  // If the lock does not exist, set the lock and set the lock expiration time, that is, add the lock 
  if (this.redisClient.setnx(this.key, LOCKED) == 1) {
   this.redisClient.expire(key, expire);// The lock expiration time is set in order to be released without release 
   // In the case of a lock, the lock expires and disappears without permanent blocking 
   this.lock = true;
   return this.lock;
  }
  System.out.println(" Lock wait ");
  // Hibernate briefly to avoid possible live locks 
  Thread.sleep(3, RANDOM.nextInt(30));
  } 
 } catch (Exception e) {
  throw new RuntimeException("locking error",e);
 }
 return false;
 }

 public void unlock() {
 try {
  if(this.lock){
  redisClient.delKey(key);// Delete directly 
  }
 } catch (Throwable e) {

 }
 }

The above code is framework code, now let's show how to use the above simple framework to write 1 SEC killing function.

First, define 1 interface, in which 1 seckill method is defined:


public interface SeckillInterface {
/**
* For now, annotations are only supported on interface methods 
*/
 //cacheLock Annotations may generate concurrent methods 
 @CacheLock(lockedPrefix="TEST_PREFIX")
 public void secKill(String userID,@LockedObject Long commidityID);// The simplest method of killing seconds, the parameter is the user ID And the commodity ID . There may be multiple threads competing 1 Goods, so goods ID add LockedObject annotations 
}

The above SeckillInterface interface implementation class, namely the specific implementation of seckill:


public class SecKillImpl implements SeckillInterface{
 static Map<Long, Long> inventory ;
 static{
 inventory = new HashMap<>();
 inventory.put(10000001L, 10000l);
 inventory.put(10000002L, 10000l);
 }

 @Override
 public void secKill(String arg1, Long arg2) {
 // The simplest second kill, here only as demo The sample 
 reduceInventory(arg2);
 }
 // Simulated killing operation, let's say 1 To kill is to reduce inventory 1 The actual situation is much more complicated 
 public Long reduceInventory(Long commodityId){
 inventory.put(commodityId,inventory.get(commodityId) - 1);
 return inventory.get(commodityId);
 }

}

Simulated killing scene, 1000 threads competing for two goods:


@Test
 public void testSecKill(){
 int threadCount = 1000;
 int splitPoint = 500;
 CountDownLatch endCount = new CountDownLatch(threadCount);
 CountDownLatch beginCount = new CountDownLatch(1);
 SecKillImpl testClass = new SecKillImpl();

 Thread[] threads = new Thread[threadCount];
 // since 500 Three threads, second to kill 1 A commodity 
 for(int i= 0;i < splitPoint;i++){
  threads[i] = new Thread(new Runnable() {
  public void run() {
   try {
   // Wait for the 1 On the semaphore. Hang up 
   beginCount.await();
   // Call with a dynamic proxy secKill methods 
   SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
    new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
   proxy.secKill("test", commidityId1);
   endCount.countDown();
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
  }
  });
  threads[i].start();

 }
 // comeback 500 Three threads, second to kill 2 items 
 for(int i= splitPoint;i < threadCount;i++){
  threads[i] = new Thread(new Runnable() {
  public void run() {
   try {
   // Wait for the 1 On the semaphore. Hang up 
   beginCount.await();
   // Call with a dynamic proxy secKill methods 
   SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
    new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
   proxy.secKill("test", commidityId2);
   //testClass.testFunc("test", 10000001L);
   endCount.countDown();
   } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   }
  }
  });
  threads[i].start();

 }


 long startTime = System.currentTimeMillis();
 // The main thread releases the start semaphore and waits for the end semaphore to do so 1000 Two threads are executed at the same time to ensure the correctness of the test 
 beginCount.countDown();

 try {
  // The main thread waits for the end semaphore 
  endCount.await();
  // See if the result is correct 
  System.out.println(SecKillImpl.inventory.get(commidityId1));
  System.out.println(SecKillImpl.inventory.get(commidityId2));
  System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
  System.out.println("total cost " + (System.currentTimeMillis() - startTime));
 } catch (InterruptedException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 }
 }

Under the correct expectation, the inventory of each item should be reduced by 500. After many tests, the actual situation is as expected. If the locking mechanism is not used, there will be a 499,498 stock reduction.
The method of dynamic proxy is adopted here, and the distributed lock ID is obtained by annotation and reflection mechanism for locking and releasing. Of course, you can do this directly in a method, and the dynamic proxy is also used to centralize the lock operation code in the proxy for easy maintenance.
As the typical seckill scenario occurs in web projects, consider taking advantage of spring's AOP features to place the lock operation code in the section, although AOP is also a dynamic agent in nature.

summary

Starting from the business scenario, this article explains how to use redis to realize distributed locking and complete the simple killing function. It also records the thinking process of the author, hoping to give some inspiration to readers of this article.

Source warehouse


Related articles: