Three implementations of Java distributed locking

  • 2020-06-01 09:44:20
  • OfStack

Scenario 1: database optimistic lock

Optimistic locking is usually based on the data version (version) logging mechanism, such as 1 red card (t_bonus), one field (left_count) number of remaining records presents each user to receive a prize, the corresponding left_count minus 1, in the case of concurrent how to ensure the left_count not negative, optimistic locking is implemented for in red packets on the table to add a version number field (version), the default is 0.

Exception implementation process


--  An abnormal situation that may occur 
--  thread 1 Query, current left_count for 1 , there are records 
select * from t_bonus where id = 10001 and left_count > 0

--  thread 2 Query, current left_count for 1 There are also records 
select * from t_bonus where id = 10001 and left_count > 0

--  thread 1 Complete collection record, modify left_count for 0,
update t_bonus set left_count = left_count - 1 where id = 10001

--  thread 2 Complete collection record, modify left_count for -1 , generating dirty data 
update t_bonus set left_count = left_count - 1 where id = 10001

Through optimistic locking


--  Add a version number control field 
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

--  thread 1 Query, current left_count for 1 , and the current version number is 1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

--  thread 2 Query, current left_count for 1 , and the current version number is 1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

--  thread 1, Update is completed after the current version for 1235 . update The status of 1 , updated successfully 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

--  thread 2, Update due to current version for 1235 . udpate The status of 0 , update failed, and then do exception handling for related business 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

Scenario 2: distributed locking based on Redis

\ SETNX command (SET if Not eXists)
Grammar: SETNX key value\
Function: atomic operation, if and only if key does not exist, set the value of key to value and return 1; If the given key already exists, SETNX does nothing and returns 0. \
Expire command \
\ (expire, expireTime)\ (key, expireTime
Function: key set expiration time \
GETSET command \
Grammar: GETSET key value\
Function: set the given key value to value and return the old value of key (old value), return 1 error if key exists but is not of string type, return nil if key does not exist. \
GET command \
Grammar: GET key\
Function: returns the string value associated with key, or the special value nil if key does not exist. \
DEL command \
Syntax: DEL key [KEY...] \
Function: delete one or more key given, key that does not exist will be ignored.

Type 1: setnx() and expire() methods using redis for distributed locking

If setnx(lockkey, 1) returns 0, the placeholder fails. If 1 is returned, the placeholder was successful The expire() command sets a timeout for lockkey to avoid deadlock issues. After the business code is executed, key can be deleted with the delete command.

In fact, this scheme can solve the needs of daily work, but from the point of view of the technical scheme, there may be 1 can be improved. For example, if an outage occurs after the success of step 1 of setnx and before the success of the expire() command, the deadlock problem will still occur

Type 2: setnx(), get(), getset() methods using redis for distributed locking to solve deadlock problems

setnx(lockkey, current time + expiration timeout), if 1 is returned, the lock is successfully acquired; If 0 is returned and no lock is acquired, redirect to 2. get(lockkey) gets the value oldExpireTime, and compares this value value with the current system time. If it is less than the current system time, the lock is considered to have timed out, and other requests can be allowed to get it again. Move to 3. Calculate newExpireTime= current time + expired timeout, and getset(lockkey, newExpireTime) returns the current lockkey value, currentExpireTime. Judge whether currentExpireTime and oldExpireTime are equal. If they are equal, it means that the current getset setting is successful and the lock has been obtained. If not, the lock has been taken by another request, and the current request can either return a failure or try again.
After acquiring the lock, the current thread can start its own business processing. When the processing is finished, it can compare its own processing time with the timeout time set for the lock. If it is less than the timeout time set for the lock, it can directly execute delete to release the lock. If it is greater than the timeout time set by the lock, there is no need to process the lock again.

import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;
/**
 * Created by IDEA
 * User: shma1664
 * Date: 2016-08-16 14:01
 * Desc: redis A distributed lock 
 */
public final class RedisLockUtil {
 private static final int defaultExpire = 60;
 private RedisLockUtil() {
 //
 }
 /**
 *  lock 
 * @param key redis key
 * @param expire  Expiration time in seconds 
 * @return true: Lock in successfully, false , lock failed 
 */
 public static boolean lock(String key, int expire) {
 RedisService redisService = SpringUtils.getBean(RedisService.class);
 long status = redisService.setnx(key, "1");

 if(status == 1) {
 redisService.expire(key, expire);
 return true;
 }
 return false;
 }
 public static boolean lock(String key) {
 return lock2(key, defaultExpire);
 }
 /**
 *  lock 
 * @param key redis key
 * @param expire  Expiration time in seconds 
 * @return true: Lock in successfully, false , lock failed 
 */
 public static boolean lock2(String key, int expire) {
 RedisService redisService = SpringUtils.getBean(RedisService.class);
 long value = System.currentTimeMillis() + expire;
 long status = redisService.setnx(key, String.valueOf(value));
 if(status == 1) {
 return true;
 }
 long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
 if(oldExpireTime < System.currentTimeMillis()) {
 // timeout 
 long newExpireTime = System.currentTimeMillis() + expire;
 long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
 if(currentExpireTime == oldExpireTime) {
 return true;
 }
 }
 return false;
 }
 public static void unLock1(String key) {
 RedisService redisService = SpringUtils.getBean(RedisService.class);
 redisService.del(key);
 }
 public static void unLock2(String key) { 
 RedisService redisService = SpringUtils.getBean(RedisService.class); 
 long oldExpireTime = Long.parseLong(redisService.get(key, "0")); 
 if(oldExpireTime > System.currentTimeMillis()) { 
 redisService.del(key); 
 }
 }
}


public void drawRedPacket(long userId) {
 String key = "draw.redpacket.userid:" + userId;
 boolean lock = RedisLockUtil.lock2(key, 60);
 if(lock) {
 try {
 // Get the operation 
 } finally {
 // Release the lock 
 RedisLockUtil.unLock(key);
 }
 } else {
 new RuntimeException(" Repeat claim ");
 }
}

Spring AOP implements the out-of-the-box redis distributed locking policy based on the annotation approach and SpEL


import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * RUNTIME
 *  Custom annotation 
 *  The compiler will record the comments in a class file at run time  VM  Comments are retained so they can be read reflectively. 
 * @author shma1664
 *
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockable {
 String[] key() default "";
 long expiration() default 60;
}

import javax.annotation.Resource;
import java.lang.reflect.Method;
import com.autohome.api.dealer.util.cache.RedisClient;
import com.google.common.base.Joiner;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-28 18:08
 * Desc:
 */
@Aspect
@Component
public class RedisLockAop {
 @Resource
 private RedisClient redisClient;
 @Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))")
 public void pointcut(){}
 @Around("pointcut()")
 public Object doAround(ProceedingJoinPoint point) throws Throwable{
 Signature signature = point.getSignature();
 MethodSignature methodSignature = (MethodSignature) signature;
 Method method = methodSignature.getMethod();
 String targetName = point.getTarget().getClass().getName();
 String methodName = point.getSignature().getName();
 Object[] arguments = point.getArgs();
 if (method != null && method.isAnnotationPresent(RedisLockable.class)) {
 RedisLockable redisLock = method.getAnnotation(RedisLockable.class);
 long expire = redisLock.expiration();
 String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments);
 boolean isLock = RedisLockUtil.lock2(redisKey, expire);
 if(!isLock) {
 try {
 return point.proceed();
 } finally {
 unLock2(redisKey);
 }
 } else {
 throw new RuntimeException(" Your operation is too frequent, please try again later ");
 }
 }
 return point.proceed();
 }
 private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {
 StringBuilder sb = new StringBuilder();
 sb.append("lock.").append(targetName).append(".").append(methodName);
 if(keys != null) {
 String keyStr = Joiner.on(".").skipNulls().join(keys);
 String[] parameters = ReflectParamNames.getNames(targetName, methodName);
 ExpressionParser parser = new SpelExpressionParser();
 Expression expression = parser.parseExpression(keyStr);
 EvaluationContext context = new StandardEvaluationContext();
 int length = parameters.length;
 if (length > 0) {
 for (int i = 0; i < length; i++) {
 context.setVariable(parameters[i], arguments[i]);
 }
 }
 String keysValue = expression.getValue(context, String.class);
 sb.append("#").append(keysValue);
 }
 return sb.toString();
 }

<!-- https://mvnrepository.com/artifact/javassist/javassist -->
<dependency>
 <groupId>org.javassist</groupId>
 <artifactId>javassist</artifactId>
 <version>3.18.1-GA</version>
</dependency>

import javassist.*;
import javassist.bytecode.CodeAttribute;
import javassist.bytecode.LocalVariableAttribute;
import javassist.bytecode.MethodInfo;
import org.apache.log4j.Logger;
/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-28 18:39
 * Desc:
 */
public class ReflectParamNames {
 private static Logger log = Logger.getLogger(ReflectParamNames.class);
 private static ClassPool pool = ClassPool.getDefault();
 static{
 ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);
 pool.insertClassPath(classPath);
 }
 public static String[] getNames(String className,String methodName) {
 CtClass cc = null;
 try {
 cc = pool.get(className);
 CtMethod cm = cc.getDeclaredMethod(methodName);
 //  use javaassist Gets the parameter name of the method 
 MethodInfo methodInfo = cm.getMethodInfo();
 CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
 LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
 if (attr == null) return new String[0];
 int begin = 0;
 String[] paramNames = new String[cm.getParameterTypes().length];
 int count = 0;
 int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;
 for (int i = 0; i < attr.tableLength(); i++){
 //  why   Add this judgment and find it in windows  with linux When executed, the parameters are not in order 1 To, by observation, the actual parameter is from this Later on 
 if (attr.variableName(i).equals("this")){
 begin = i;
 break;
 }
 }
 for (int i = begin+1; i <= begin+paramNames.length; i++){
 paramNames[count] = attr.variableName(i);
 count++;
 }
 return paramNames;
 } catch (Exception e) {
 e.printStackTrace();
 }finally{
 try {
 if(cc != null) cc.detach();
 } catch (Exception e2) {
 log.error(e2.getMessage());
 }
 }
 return new String[0];
 }
}

Add annotations where you need to use distributed locks


/**
 *  Draw the interface 
 *  add redis Distributed lock guarantee 1 Per order only 1 Three requests are processed to prevent users from swiping gifts for support SpEL expression 
 * redisLockKey : lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId
 * @param orderId  The order id
 * @return  Information about the prize you won 
 */
@RedisLockable(key = {"#orderId"}, expiration = 120)
@Override
public BonusConvertBean drawBonus(Integer orderId) throws BonusException{
 //  The business logic 
}

The third solution: distributed locking based on Zookeeper

An exclusive lock is implemented using the singleness of the node name

The ZooKeeper mechanism stipulates that only 1 unique file name can be found in the same directory. The 1 znode on zookeeper is considered as a lock and is implemented by createznode. All clients create the /lock/${lock_name}_lock node, and the client that is finally created has the lock. The client that fails to create can choose to listen and wait, or to give up throwing an exception to realize the exclusive lock.
package com.shma.example.zookeeper.lock;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
/**
 * Created by IDEA
 * User: mashaohua
 * Date: 2016-09-30 16:09
 * Desc:
 */
public class ZookeeperLock implements Lock, Watcher {
 private ZooKeeper zk;
 private String root = "/locks";// The root 
 private String lockName;// The symbol of competing resources 
 private String myZnode;// The current lock 
 private int sessionTimeout = 30000;
 private List<Exception> exception = new ArrayList<Exception>();
 /**
 *  Creating distributed locks , Please confirm before use config Configuration of the zookeeper Services are available 
 * @param config 127.0.0.1:2181
 * @param lockName  Competitive resource marking ,lockName Can't contain words lock
 */
 public ZookeeperLock(String config, String lockName){
 this.lockName = lockName;
 //  create 1 Two connections to the server 
 try {
 zk = new ZooKeeper(config, sessionTimeout, this);
 Stat stat = zk.exists(root, false);
 if(stat == null){
 //  Create the root node 
 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 }
 } catch (IOException e) {
 exception.add(e);
 } catch (KeeperException e) {
 exception.add(e);
 } catch (InterruptedException e) {
 exception.add(e);
 }
 }
 @Override
 public void lock() {
 if(exception.size() > 0){
 throw new LockException(exception.get(0));
 }
 if(!tryLock()) {
 throw new LockException(" Your operation is too frequent, please try again later ");
 }
 }
 @Override
 public void lockInterruptibly() throws InterruptedException {
 this.lock();
 }
 @Override
 public boolean tryLock() {
 try {
 myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 return true;
 } catch (KeeperException e) {
 e.printStackTrace();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 return false;
 }
 @Override
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
 return tryLock();
 }
 @Override
 public void unlock() {
 try {
 zk.delete(myZnode, -1);
 myZnode = null;
 zk.close();
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (KeeperException e) {
 e.printStackTrace();
 }
 }
 @Override
 public Condition newCondition() {
 return null;
 }
 @Override
 public void process(WatchedEvent watchedEvent) {
 //
 }
}

--  Add a version number control field 
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

--  thread 1 Query, current left_count for 1 , and the current version number is 1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

--  thread 2 Query, current left_count for 1 , and the current version number is 1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

--  thread 1, Update is completed after the current version for 1235 . update The status of 1 , updated successfully 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

--  thread 2, Update due to current version for 1235 . udpate The status of 0 , update failed, and then do exception handling for related business 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
0

Using temporary sequence nodes to control the timing implementation

/lock has been preexisting, all clients under it to create a temporary order number directory node, and select master1 like, the smallest number to get the lock, used up to delete, in turn convenient. \

Algorithm idea: for locking operation, you can ask all clients to create temporary sequential nodes in /lock directory. If the created client finds that the serial number of the created node is the smallest node in /lock/ directory, it will get the lock. Otherwise, monitor the node that is smaller than the serial number of the node you created (the largest node smaller than the one you created) and enter the wait.

For the unlock operation, simply remove the node you created.


--  Add a version number control field 
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;

--  thread 1 Query, current left_count for 1 , and the current version number is 1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

--  thread 2 Query, current left_count for 1 , and the current version number is 1234
select left_count, version from t_bonus where id = 10001 and left_count > 0

--  thread 1, Update is completed after the current version for 1235 . update The status of 1 , updated successfully 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

--  thread 2, Update due to current version for 1235 . udpate The status of 0 , update failed, and then do exception handling for related business 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
1

Related articles: