Second killing function based on redis distributed lock
- 2020-06-07 05:33:11
- OfStack
Recently, I encountered a business scenario similar to "seckill" in a project. In this blog, I will use a very simple demo to illustrate the basic idea of implementing the so-called "seckill".
The business scenario
The so-called seckill, from the perspective of business, is a short period of time for multiple users to "scramble" for resources, resources here in most of the seckill scenes are commodities; From the perspective of business abstraction and technology, seckilling is the operation of multiple threads on resources. Therefore, to realize seckilling, it is necessary to control the contention of threads on resources to ensure both efficient concurrency and correct operation.
1 Some possible implementations
As mentioned earlier, the key to realizing seckill is to control the contention of threads for resources. Based on basic thread knowledge, you can think of the following methods without thinking:
1. The technical abstraction of seckill should be 1 method. In this method, the possible operation is to stock the goods -1, add the goods to the user's shopping cart and so on. The simplest and most straightforward implementation is to add the synchronized keyword to this method, which in layman's terms locks the entire method.
2. Lock the whole method This strategy is simple and convenient, but it seems a little rough. You can optimize 1 slightly to lock only blocks of code that kill seconds, such as the part that writes to the database.
3, since there is a concurrency problem, then I let him "not concurrent", all threads with a queue management, make it become serial operation, naturally there will be no concurrency problem.
All of the above methods are effective, but none of them are good. Why is that? Methods 1 and 2 are essentially "locking," but lock granularity is still relatively high. What do you mean? In the first scenario, if two threads execute the seckill method at the same time, the two threads are operating on different commodities, so it should be possible to simultaneously perform the operation from the business perspective. However, if the 12th method is adopted, the two threads will also fight for the same lock, which is actually unnecessary. The third approach does not solve the problem mentioned above.
So how do you control locks at a finer level of granularity? Consider setting a mutex for each item, identified only by the string associated with the item ID, so that only threads competing for the same item are mutex exclusive, not all threads mutex exclusive. Distributed locking can help us solve this problem.
What is distributed locking
Distributed locking is a way to control synchronous access to Shared resources between distributed systems. In distributed systems, it is often necessary to coordinate their actions. If one or a set of resources are Shared between different systems or hosts of the same system, mutual exclusion is often needed to prevent interference between each other to ensure the 1 consistency when accessing these resources. In this case, distributed locks are needed.
Let's assume the simplest scenario of seckilling: there is a table in the database. column is the inventory of goods ID and ID respectively. The inventory of goods ID will be -1 if the seckilling is successful. Now let's say I have 1000 threads killing 2 items, 500 threads killing the first item, and 500 threads killing the second item. Let's explain 1 distributed lock in terms of this simple business scenario.
In general, business systems with seckill scenarios are complex, carrying a huge amount of traffic and a high amount of concurrency. Such systems often adopt a distributed architecture to balance the load. The 1000 concurrences will come from different places, the inventory of goods will be the Shared resource, and the 1000 concurrences will compete for resources, so we need to manage the concurrency mutex. This is the application of distributed locking.
key-value storage systems, such as redis, are important tools for distributed locking because of their 1 features.
Concrete implementation
Let's start with some basic redis commands:
SETNX key value
If key does not exist, set key to the corresponding string value. In this case, the command looks like SET1. When key already exists, nothing is done. SETNX is "SET if Not eXists".
expire KEY seconds
Set the expiration time for key. If key has expired, it will be deleted automatically.
del KEY
Delete key
Since the author's implementation only used these three commands, I will only introduce these three commands, more commands and the features and use of redis, you can refer to the redis website.
Questions to consider
1. What is redis used? Thanks to redis, jedis has provided the jedis client for the java application. Call jedis API directly.
2. How to realize locking? "Lock" is actually an abstract concept, and to turn this abstract concept into something concrete, is an ES75en-ES76en pair stored in redis. key is the only string associated with the commodity ID to identify the 1. value is not important, because as long as the unique key-ES81en exists, the commodity is already locked.
3. How to release the lock? Since the presence of key-ES84en pairs means that the lock is locked, releasing the lock is naturally removing the ES86en-ES87en pair from redis.
4. Blocked or non-blocked? The author USES a blocking implementation in which if a thread finds that it is locked, it polls the lock for a specific period of time.
5. How to handle abnormal situations? Such as a thread to a commodity on the lock, but due to various reasons, there is no complete operations (in the above business scenario is that there is no written to the database will inventory - 1), natural no lock is released, the author joined the lock timeout mechanism, this situation using redis expire command to key set the timeout value, after a timeout redis would this key automatically deleted, or forced to release the lock (can be thought of as a timeout releases the lock is an asynchronous operation, performed by redis, application you just need to according to the system characteristic set the timeout).
talk is cheap,show me the code
At the code implementation level, the annotation has concurrent methods and parameters, the method and parameters of the annotation are obtained through the dynamic proxy, the agent is locked, and the lock is released after executing the method of the agent.
A few annotations define:
cachelock is a method level annotation used to annotate methods that cause concurrency problems:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default "";//redis The lock key The prefix
long timeOut() default 2000;// Poll the lock for time
int expireTime() default 1000;//key in redis The time in existence, 1000S
}
lockedObject is a parameter-level annotation used to annotate basic types of parameters such as commodity ID:
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
// Don't need to value
}
LockedComplexObject is also a parameter-level annotation used to annotate parameters of a custom type:
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
String field() default "";// A member variable that needs to be locked in a complex object that contains a member variable, such as 1 The commodity of commodity object ID
}
CacheLockInterceptor implements the InvocationHandler interface, obtains the method and parameters of the annotation in the invoke method, locks the method before executing the annotation, and releases the lock after executing the annotated method:
public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT = 0;
private Object proxied;
public CacheLockInterceptor(Object proxied) {
this.proxied = proxied;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
CacheLock cacheLock = method.getAnnotation(CacheLock.class);
// There is no cacheLock Annotations, pass
if(null == cacheLock){
System.out.println("no cacheLock annotation");
return method.invoke(proxied, args);
}
// Gets a comment for a parameter in a method
Annotation[][] annotations = method.getParameterAnnotations();
// Gets the locked parameter based on the parameter annotation and parameter list obtained
Object lockedObject = getLockedObject(annotations,args);
String objectValue = lockedObject.toString();
// new 1 A lock
RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
// lock
boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
if(!result){// Take the lock failure
ERROR_COUNT += 1;
throw new CacheLockException("get lock fail");
}
try{
// Lock successfully. Execute method
return method.invoke(proxied, args);
}finally{
lock.unlock();// Release the lock
}
}
/**
*
* @param annotations
* @param args
* @return
* @throws CacheLockException
*/
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
if(null == args || args.length == 0){
throw new CacheLockException(" Method parameter is null and there is no locked object ");
}
if(null == annotations || annotations.length == 0){
throw new CacheLockException(" Parameters that are not annotated ");
}
// Multiple parameter locking is not supported, only the 1 A note to lockedObject or lockedComplexObject The parameters of the
int index = -1;// Marks the position pointer of the parameter
for(int i = 0;i < annotations.length;i++){
for(int j = 0;j < annotations[i].length;j++){
if(annotations[i][j] instanceof LockedComplexObject){// Annotated as LockedComplexObject
index = i;
try {
return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
} catch (NoSuchFieldException | SecurityException e) {
throw new CacheLockException(" There is no attribute in the annotation object " + ((LockedComplexObject)annotations[i][j]).field());
}
}
if(annotations[i][j] instanceof LockedObject){
index = i;
break;
}
}
// Find the first 1 After a direct break Multi-parameter locking is not supported
if(index != -1){
break;
}
}
if(index == -1){
throw new CacheLockException(" Please specify the locked parameter ");
}
return args[index];
}
}
The most critical lock and unlock methods in the RedisLock class:
/**
* lock
* Usage:
* lock();
* try{
* executeMethod();
* }finally{
* unlock();
* }
* @param timeout timeout Polling locks within the time range
* @param expire Set the lock timeout period
* @return successful or failure
*/
public boolean lock(long timeout,int expire){
long nanoTime = System.nanoTime();
timeout *= MILLI_NANO_TIME;
try {
// in timeout Constantly poll locks within the time range
while (System.nanoTime() - nanoTime < timeout) {
// If the lock does not exist, set the lock and set the lock expiration time, that is, add the lock
if (this.redisClient.setnx(this.key, LOCKED) == 1) {
this.redisClient.expire(key, expire);// The lock expiration time is set in order to be released without release
// In the case of a lock, the lock expires and disappears without permanent blocking
this.lock = true;
return this.lock;
}
System.out.println(" Lock wait ");
// Hibernate briefly to avoid possible live locks
Thread.sleep(3, RANDOM.nextInt(30));
}
} catch (Exception e) {
throw new RuntimeException("locking error",e);
}
return false;
}
public void unlock() {
try {
if(this.lock){
redisClient.delKey(key);// Delete directly
}
} catch (Throwable e) {
}
}
The above code is framework code, now let's show how to use the above simple framework to write 1 SEC killing function.
First, define 1 interface, in which 1 seckill method is defined:
public interface SeckillInterface {
/**
* For now, annotations are only supported on interface methods
*/
//cacheLock Annotations may generate concurrent methods
@CacheLock(lockedPrefix="TEST_PREFIX")
public void secKill(String userID,@LockedObject Long commidityID);// The simplest method of killing seconds, the parameter is the user ID And the commodity ID . There may be multiple threads competing 1 Goods, so goods ID add LockedObject annotations
}
The above SeckillInterface interface implementation class, namely the specific implementation of seckill:
public class SecKillImpl implements SeckillInterface{
static Map<Long, Long> inventory ;
static{
inventory = new HashMap<>();
inventory.put(10000001L, 10000l);
inventory.put(10000002L, 10000l);
}
@Override
public void secKill(String arg1, Long arg2) {
// The simplest second kill, here only as demo The sample
reduceInventory(arg2);
}
// Simulated killing operation, let's say 1 To kill is to reduce inventory 1 The actual situation is much more complicated
public Long reduceInventory(Long commodityId){
inventory.put(commodityId,inventory.get(commodityId) - 1);
return inventory.get(commodityId);
}
}
Simulated killing scene, 1000 threads competing for two goods:
@Test
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();
Thread[] threads = new Thread[threadCount];
// since 500 Three threads, second to kill 1 A commodity
for(int i= 0;i < splitPoint;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
// Wait for the 1 On the semaphore. Hang up
beginCount.await();
// Call with a dynamic proxy secKill methods
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId1);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
// comeback 500 Three threads, second to kill 2 items
for(int i= splitPoint;i < threadCount;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
// Wait for the 1 On the semaphore. Hang up
beginCount.await();
// Call with a dynamic proxy secKill methods
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId2);
//testClass.testFunc("test", 10000001L);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
long startTime = System.currentTimeMillis();
// The main thread releases the start semaphore and waits for the end semaphore to do so 1000 Two threads are executed at the same time to ensure the correctness of the test
beginCount.countDown();
try {
// The main thread waits for the end semaphore
endCount.await();
// See if the result is correct
System.out.println(SecKillImpl.inventory.get(commidityId1));
System.out.println(SecKillImpl.inventory.get(commidityId2));
System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
System.out.println("total cost " + (System.currentTimeMillis() - startTime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Under the correct expectation, the inventory of each item should be reduced by 500. After many tests, the actual situation is as expected. If the locking mechanism is not used, there will be a 499,498 stock reduction.
The method of dynamic proxy is adopted here, and the distributed lock ID is obtained by annotation and reflection mechanism for locking and releasing. Of course, you can do this directly in a method, and the dynamic proxy is also used to centralize the lock operation code in the proxy for easy maintenance.
As the typical seckill scenario occurs in web projects, consider taking advantage of spring's AOP features to place the lock operation code in the section, although AOP is also a dynamic agent in nature.
summary
Starting from the business scenario, this article explains how to use redis to realize distributed locking and complete the simple killing function. It also records the thinking process of the author, hoping to give some inspiration to readers of this article.
Source warehouse