Simple annotations to implement cluster synchronization locks (spring+redis+ annotations)

  • 2020-06-01 09:43:28
  • OfStack

During the Internet interview, is it true that the interviewer often asks a question about how to ensure the concurrent data operation in the cluster environment? The commonly used synchronized is definitely not enough. Perhaps you can use for update to lock the data? For the final solution in this article, you only need to add a @P4jSyn annotation to the method to ensure the same effect as synchronized in a clustered environment, and the key of the lock can be specified at will. This note also supports a timeout mechanism for locks.

This article requires a definite understanding of Redis, spring, and spring-data-redis. Of course, you can use this article right idea through the annotation to the method returns data caching, similar com. google. code. simple spring - memcached @ ReadThroughSingleCache.

Step 1: introduce the two custom annotations P4jSyn and P4jSynKey

P4jSyn: required, marked on the method, indicating the need to add a cluster synchronization lock to the method;

P4jSynKey: optional, added on the method parameter, means that key with a method parameter as the lock, to ensure more pits, P4jSynKey is not forced to add, synKey of P4jSyn will only be used as the lock key if there is no P4jSyn tag.


package com.yaoguoyin.redis.lock; 
import java.lang.annotation.ElementType; 
import java.lang.annotation.Inherited; 
import java.lang.annotation.Retention; 
import java.lang.annotation.RetentionPolicy; 
import java.lang.annotation.Target; 
/** 
 * <b> Synchronization locks: </b><br/> 
 *  The primary purpose is to guarantee the method in a clustered server environment synchronize ; <br/> 
 *  Marking on the method makes the execution of the method mutually exclusive and does not guarantee the sequence of concurrent execution methods. <br/> 
 *  If the original" A Task "the task execution time after acquiring the lock exceeds the maximum allowed lock holding time, and the lock is" B Task "got, in" B Mission "successful cargo lock will not terminate" A The execution of the "mission"; <br/> 
 * <br/> 
 * <b> Note: </b><br/> 
 *  Attention should be paid to the process of use keepMills , toWait , sleepMills , maxSleepMills The use of isoparametric scenarios; <br/> 
 *  You need to install redis And the use of spring and spring-data-redis Such as, with the aid of redis NX And so on.  
 * 
 * @see com.yaoguoyin.redis.lock.P4jSynKey 
 * @see com.yaoguoyin.redis.lock.RedisLockAspect 
 * 
 * @author partner4java 
 * 
 */ 
@Target({ ElementType.METHOD }) 
@Retention(RetentionPolicy.RUNTIME) 
@Inherited 
public @interface P4jSyn { 
 /** 
 *  The lock key<br/> 
 *  If you want to increase the number of pits to add a non-fixed lock, you can add it on the parameter @P4jSynKey Comment, but this parameter is a mandatory option <br/> 
 * redis key The spelling rule is  "RedisSyn+" + synKey + @P4jSynKey<br/> 
 * 
 */ 
 String synKey(); 
 /** 
 *  Lock holding time, timeout time, lock holding longer than this time automatically discard the lock <br/> 
 *  Unit of milliseconds , The default 20 seconds <br/> 
 *  If it is 0 Means never release the lock, when set to 0 Under the condition of toWait for true It doesn't make sense <br/> 
 *  However, without strong business requirements, it is not recommended to set as 0 
 */ 
 long keepMills() default 20 * 1000; 
 /** 
 *  When lock acquisition fails, continue to wait or give up <br/> 
 *  The default is to continue to wait  
 */ 
 boolean toWait() default true; 
 /** 
 *  And if the lock is not acquired toWait() To continue to wait, sleep specifies the number of milliseconds to continue to acquire the lock, which is the time for the rotation to acquire the lock <br/> 
 *  The default is 10 ms  
 * 
 * @return 
 */ 
 long sleepMills() default 10; 
 /** 
 *  Lock acquisition timeout: <br/> 
 *  And if the lock is not acquired toWait() for true Continue to wait for maximum wait time if timeout is thrown  
 * {@link java.util.concurrent.TimeoutException.TimeoutException} 
 *  , can catch this exception to do the corresponding business processing; <br/> 
 *  Unit of milliseconds , The default 1 Minutes, if set to 0 There is no timeout, 1 Straight down;  
 * 
 * @return 
 */ 
 long maxSleepMills() default 60 * 1000; 
} 

package com.yaoguoyin.redis.lock; 
import java.lang.annotation.ElementType; 
import java.lang.annotation.Inherited; 
import java.lang.annotation.Retention; 
import java.lang.annotation.RetentionPolicy; 
import java.lang.annotation.Target; 
/** 
 * <b> Synchronization lock  key</b><br/> 
 *  Add to the method's parameters, and the specified parameter will be used as the lock key the 1 Part of the  
 * 
 * @author partner4java 
 * 
 */ 
@Target({ ElementType.PARAMETER }) 
@Retention(RetentionPolicy.RUNTIME) 
@Inherited 
public @interface P4jSynKey { 
 /** 
 * key Sequence of splicing  
 * 
 * @return 
 */ 
 int index() default 0; 
} 

The use of the two annotations is not explained here, as they are already explained in great detail.

Use examples:


package com.yaoguoyin.redis.lock; 
import org.springframework.stereotype.Component; 
@Component 
public class SysTest { 
 private static int i = 0; 
 @P4jSyn(synKey = "12345") 
 public void add(@P4jSynKey(index = 1) String key, @P4jSynKey(index = 0) int key1) { 
 i++; 
 System.out.println("i=-===========" + i); 
 } 
} 

Step 2: faceted programming

The most straightforward way to ensure synchronization without affecting the original code is to use aspect programming


package com.yaoguoyin.redis.lock; 
import java.lang.annotation.Annotation; 
import java.lang.reflect.Method; 
import java.util.SortedMap; 
import java.util.TreeMap; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 
import org.aspectj.lang.ProceedingJoinPoint; 
import org.aspectj.lang.annotation.Around; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.reflect.MethodSignature; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.data.redis.core.BoundValueOperations; 
import org.springframework.data.redis.core.RedisTemplate; 
/** 
 *  Aspect programming of locks <br/> 
 *  To add @RedisLock  Annotated methods are locked  
 * 
 * @see com.yaoguoyin.redis.lock.P4jSyn 
 * 
 * @author partner4java 
 * 
 */ 
@Aspect 
public class RedisLockAspect { 
 @Autowired 
 @Qualifier("redisTemplate") 
 private RedisTemplate<String, Long> redisTemplate; 
 @Around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.P4jSyn)") 
 public Object lock(ProceedingJoinPoint pjp) throws Throwable { 
 P4jSyn lockInfo = getLockInfo(pjp); 
 if (lockInfo == null) { 
  throw new IllegalArgumentException(" Configuration parameter error "); 
 } 
 String synKey = getSynKey(pjp, lockInfo.synKey()); 
 if (synKey == null || "".equals(synKey)) { 
  throw new IllegalArgumentException(" Configuration parameters synKey error "); 
 } 
 boolean lock = false; 
 Object obj = null; 
 try { 
  //  timeout  
  long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills(); 
  while (!lock) { 
  long keepMills = System.currentTimeMillis() + lockInfo.keepMills(); 
  lock = setIfAbsent(synKey, keepMills); 
  //  Get the lock. No one has ever added the same lock  
  if (lock) { 
   obj = pjp.proceed(); 
  } 
  //  The lock is set to have no timeout  
  else if (lockInfo.keepMills() <= 0) { 
   //  Continue to wait for the lock to be acquired  
   if (lockInfo.toWait()) { 
   //  Throws an exception if the maximum wait time is exceeded  
   if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) { 
    throw new TimeoutException(" Get lock resource wait timeout "); 
   } 
   TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); 
   } else { 
   break; 
   } 
  } 
  //  Expired, and getAndSet After the old timestamp is still expired, you can think that the lock was acquired  
  else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) { 
   lock = true; 
   obj = pjp.proceed(); 
  } 
  //  I didn't get any locks  
  else { 
   //  Continue to wait for the lock to be acquired  
   if (lockInfo.toWait()) { 
   //  Throws an exception if the maximum wait time is exceeded  
   if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) { 
    throw new TimeoutException(" Get lock resource wait timeout "); 
   } 
   TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); 
   } 
   //  Give up waiting  
   else { 
   break; 
   } 
  } 
  } 
 } catch (Exception e) { 
  e.printStackTrace(); 
  throw e; 
 } finally { 
  //  If a lock is acquired, release the lock  
  if (lock) { 
  releaseLock(synKey); 
  } 
 } 
 return obj; 
 } 
 /** 
 *  Gets the values that include the method parameters key<br/> 
 * redis key The spelling rule is  "RedisSyn+" + synKey + @P4jSynKey 
 * 
 */ 
 private String getSynKey(ProceedingJoinPoint pjp, String synKey) { 
 try { 
  synKey = "RedisSyn+" + synKey; 
  Object[] args = pjp.getArgs(); 
  if (args != null && args.length > 0) { 
  MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); 
  Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations(); 
  SortedMap<Integer, String> keys = new TreeMap<Integer, String>(); 
 
  for (int ix = 0; ix < paramAnnotationArrays.length; ix++) { 
   P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]); 
   if (p4jSynKey != null) { 
   Object arg = args[ix]; 
   if (arg != null) { 
    keys.put(p4jSynKey.index(), arg.toString()); 
   } 
   } 
  } 
  if (keys != null && keys.size() > 0) { 
   for (String key : keys.values()) { 
   synKey = synKey + key; 
   } 
  } 
  } 
  return synKey; 
 } catch (Exception e) { 
  e.printStackTrace(); 
 } 
 return null; 
 } 
 @SuppressWarnings("unchecked") 
 private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) { 
 if (annotations != null && annotations.length > 0) { 
  for (final Annotation annotation : annotations) { 
  if (annotationClass.equals(annotation.annotationType())) { 
   return (T) annotation; 
  } 
  } 
 } 
 return null; 
 } 
 /** 
 *  To obtain RedisLock Annotation information  
 */ 
 private P4jSyn getLockInfo(ProceedingJoinPoint pjp) { 
 try { 
  MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); 
  Method method = methodSignature.getMethod(); 
  P4jSyn lockInfo = method.getAnnotation(P4jSyn.class); 
  return lockInfo; 
 } catch (Exception e) { 
  e.printStackTrace(); 
 } 
 return null; 
 } 
 public BoundValueOperations<String, Long> getOperations(String key) { 
 return redisTemplate.boundValueOps(key); 
 } 
 /** 
 * Set {@code value} for {@code key}, only if {@code key} does not exist. 
 * <p> 
 * See http://redis.io/commands/setnx 
 * 
 * @param key 
 *  must not be {@literal null}. 
 * @param value 
 *  must not be {@literal null}. 
 * @return 
 */ 
 public boolean setIfAbsent(String key, Long value) { 
 return getOperations(key).setIfAbsent(value); 
 } 
 public long getLock(String key) { 
 Long time = getOperations(key).get(); 
 if (time == null) { 
  return 0; 
 } 
 return time; 
 } 
 public long getSet(String key, Long value) { 
 Long time = getOperations(key).getAndSet(value); 
 if (time == null) { 
  return 0; 
 } 
 return time; 
 } 
 public void releaseLock(String key) { 
 redisTemplate.delete(key); 
 } 
} 

RedisLockAspect gives special treatment to methods that add annotations, as shown in the lock method.

The general idea is:

1. It is preferred to support the corresponding setIfAbsent method with the help of redis itself. The characteristic of this method is that if the data in redis is not saved and returned to false, the data will be returned to true if not saved;

2. If setIfAbsent returns true to get the synchronous lock, it can be operated, and the lock can be released after operation;

3. If the data is not obtained through setIfAbsent, determine whether a timeout mechanism has been set for the lock, and determine whether it needs to continue to wait without setting;

4. Determine whether the lock has expired, and check (System.currentTimeMillis) > getLock(synKey) & & (System.currentTimeMillis() > On closer inspection, getSet(synKey, keepMills)) might have changed the timeout of someone else's lock, but it's almost negligible.

5, do not get any lock, judge to continue to wait or exit.

Step 3: basic configuration of spring


#*****************jedis Connection parameter setting *********************# 
 
#redis The server ip # 
redis.hostName=127.0.0.1 
 
#redis Server port number # 
redis.port=6379 
 
#redis Server external access password  
redis.password=XXXXXXXXXX 
 
#************************jedis Pool parameter setting *******************# 
 
#jedis Is the largest allocation object # 
jedis.pool.maxActive=1000 
 
jedis.pool.minIdle=100 
 
#jedis The biggest save idel Number of state objects  # 
jedis.pool.maxIdle=1000 
 
#jedis Maximum wait time when no objects are returned from the pool  # 
jedis.pool.maxWait=5000 
 
#jedis call borrowObject Method, whether to carry out effective check # 
jedis.pool.testOnBorrow=true 
 
#jedis call returnObject Method, whether to carry out effective check  # 
jedis.pool.testOnReturn=true 

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:redis="http://www.springframework.org/schema/redis" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.1.xsd  http://www.springframework.org/schema/redis  http://www.springframework.org/schema/redis/spring-redis.xsd 
http://www.springframework.org/schema/cache  http://www.springframework.org/schema/cache/spring-cache.xsd"> 
 <!--  Open the annotation  --> 
 <aop:aspectj-autoproxy /> 
 <bean class="com.yaoguoyin.redis.lock.RedisLockAspect" /> 
 <!--  Scan the annotation package scope  --> 
 <context:component-scan base-package="com.yaoguoyin" /> 
 <!--  The introduction of redis configuration  --> 
 <context:property-placeholder location="classpath:config.properties" /> 
 <!--  The connection pool  --> 
 <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> 
 <property name="minIdle" value="${jedis.pool.minIdle}" /> 
 <property name="maxIdle" value="${jedis.pool.maxIdle}" /> 
 <property name="maxWaitMillis" value="${jedis.pool.maxWait}" /> 
 </bean> 
 <!-- p:password="${redis.pass}" --> 
 <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${redis.hostName}" p:port="${redis.port}" 
 p:password="${redis.password}" p:pool-config-ref="poolConfig" /> 
 <!--  Similar to the jdbcTemplate --> 
 <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="redisConnectionFactory" /> 
</beans> 

The installation of redis is not covered in this article.

test


package com.yaoguoyin.redis; 
import org.junit.runner.RunWith; 
import org.springframework.test.context.ContextConfiguration; 
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; 
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 
@RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" }) 
public class BaseTest extends AbstractJUnit4SpringContextTests { 
} 

package com.yaoguoyin.redis.lock; 
import java.util.concurrent.TimeUnit; 
import org.junit.Test; 
import org.springframework.beans.factory.annotation.Autowired; 
import com.yaoguoyin.redis.BaseTest; 
public class RedisTest extends BaseTest { 
 @Autowired 
 private SysTest sysTest; 
 @Test 
 public void testHello() throws InterruptedException { 
 for (int i = 0; i < 100; i++) { 
  new Thread(new Runnable() { 
  @Override 
  public void run() { 
   try { 
   TimeUnit.SECONDS.sleep(1); 
   } catch (InterruptedException e) { 
   e.printStackTrace(); 
   } 
   sysTest.add("xxxxx", 111111); 
  } 
  }).start(); 
 } 
 TimeUnit.SECONDS.sleep(20); 
 } 
 @Test 
 public void testHello2() throws InterruptedException{ 
 sysTest.add("xxxxx", 111111); 
 TimeUnit.SECONDS.sleep(10); 
 } 
} 

You can to

void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)

Remove the annotation @P4jSyn for test comparison.

ps: the performance of this demo depends on the distance between redis and Java. It is recommended not to use this form of single-lock concurrency, directly through redis and other solutions, this demo only solve the form of small concurrency do not want to coupling code.


Related articles: