Using consul to Implement Distributed Lock Scenario Analysis in spring boot

  • 2021-11-13 07:52:22
  • OfStack

Because the actual process of the project is used in the micro-service architecture, considering that the carrying capacity of each service of the same business is basically multi-node deployment, the access to some resources has to use distributed locks.

Here is one of the simplest scenarios, If there is an intelligent vending machine, because of the machine itself, it can't produce two goods at the same time with one machine, which requires that only one order can be successfully created when concurrent orders are created for the same machine at the same time before the shipment process, but the order service is deployed by multiple nodes, so distributed locks have to be used.

The above is just a simple business scenario, In various large-scale Internet practical application, There will be more business scenarios requiring distributed locks, This paper comprehensively compares the distributed locking schemes based on various middleware in the industry, and finally decides to adopt consul in combination with the actual business, because consul is used as the registry in our project, and consul can guarantee uniformity by nature (this is similar to zk). Of course, zk can also realize distributed locking, but this point will not be discussed too much here.

Although redis can also realize distributed locking, But maybe because the scene is complicated, If redis is deployed with cluster, If a 1 master node fails, there is a definite probability that brain schizophrenia will occur, which may make competitors get locks at the same time when they are concurrent, which may destroy the following business. Of course, the probability of this situation is very low, but it cannot be completely ruled out, because redis cannot guarantee strong uniformity at all.

Well, the simplest distributed lock here means, When multiple competitors concurrently acquire a lock in the same time, If the acquisition fails, it will return directly, and if the acquisition succeeds, it will continue the subsequent process, and then release the lock at an appropriate time, and add a timeout time to the lock, so as to prevent the process or thread that obtained the lock from hanging up before releasing the lock, resulting in the resource being locked in a state of 1 and unable to be released. The main implementation logic is like this, if someone wants to implement the lock loss

The defeated competitor 1 straight continues to try to gain, which can be modified based on this example, plus spin logic on OK.

Here is the lock implementation code:


package com.lyb.consullock;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.agent.model.NewCheck;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import lombok.Data;


import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;


public class DistributedLock{
    private ConsulClient consulClient;

    /**
     *  Constructor 
     * @param consulHost  Registration consul Adj. client Or server-side Ip Or host name, or domain name 
     * @param consulPort  Port number 
     */
    public DistributedLock(String consulHost,int consulPort){
        consulClient = new ConsulClient(consulHost,consulPort);
    }

    /**
     *  The method of obtaining the lock 
     * @param lockName  Competing resource name 
     * @param ttlSeconds  The timeout time of the lock, beyond which it is automatically released 
     * @return
     */
    public LockContext getLock(String lockName,int ttlSeconds){
        LockContext lockContext = new LockContext();
        if(ttlSeconds<10 || ttlSeconds > 86400) ttlSeconds = 60;
        String sessionId = createSession(lockName,ttlSeconds);
        boolean success = lock(lockName,sessionId);
        if(success == false){
            consulClient.sessionDestroy(sessionId,null);
            lockContext.setGetLock(false);

            return lockContext;
        }

        lockContext.setSession(sessionId);
        lockContext.setGetLock(true);

        return lockContext;
    }

    /**
     *  Release lock 
     * @param sessionID
     */
    public void releaseLock(String sessionID){
        consulClient.sessionDestroy(sessionID,null);
    }

    private String createSession(String lockName,int ttlSeconds){
        NewCheck check = new NewCheck();
        check.setId("check "+lockName);
        check.setName(check.getId());
        check.setTtl(ttlSeconds+"s"); // This value and session ttl Decide together to decide the locking time 
        check.setTimeout("10s");
        consulClient.agentCheckRegister(check);
        consulClient.agentCheckPass(check.getId());

        NewSession session = new NewSession();
        session.setBehavior(Session.Behavior.RELEASE);
        session.setName("session "+lockName);
        session.setLockDelay(1);
        session.setTtl(ttlSeconds + "s"); // And check ttl Determine the lock duration together 
        List<String> checks = new ArrayList<>();
        checks.add(check.getId());
        session.setChecks(checks);
        String sessionId = consulClient.sessionCreate(session,null).getValue();

        return sessionId;
    }

    private boolean lock(String lockName,String sessionId){
        PutParams putParams = new PutParams();
        putParams.setAcquireSession(sessionId);

        boolean isSuccess = consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue();

        return isSuccess;
    }

    /**
     *  Objects returned when competing for locks 
     */
    @Data
    public class LockContext{
        /**
         *  Obtaining the lock successfully returns this value, and then uses this value to release the lock later 
         */
        private String session;
        /**
         *  Whether to acquire a lock 
         */
        private boolean isGetLock;
    }
}

pom file


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lyb</groupId>
    <artifactId>consul-lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>consul-lock</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR2</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Test code:


package com.lyb.consullock;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ConsulLockApplicationTests {
    @Autowired
    private ServiceConfig serviceConfig;
    @Test
    public void lockSameResourer() {
        // For the same resource in the same 1 The moment is only 1 Three threads will acquire the lock 
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int a=0;a<20;a++){
            threadPool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            DistributedLock lock = new DistributedLock(
                                    serviceConfig.getConsulRegisterHost(),
                                    serviceConfig.getConsulRegisterPort());

                            DistributedLock.LockContext lockContext = lock.getLock("test lock", 10);
                            if (lockContext.isGetLock()) {
                                System.out.println(Thread.currentThread().getName() + " The lock was acquired ");
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                    lock.releaseLock(lockContext.getSession());
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                //System.out.println(Thread.currentThread().getName() + " No lock acquired ");
                            }
                        }
                    });
        }

        try {
            TimeUnit.MINUTES.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void lockDiffResource(){
        // All threads should be able to acquire locks for impassable resources 
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int a=0;a<20;a++){
            threadPool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            DistributedLock lock = new DistributedLock(
                                    serviceConfig.getConsulRegisterHost(),
                                    serviceConfig.getConsulRegisterPort());

                            DistributedLock.LockContext lockContext = lock.getLock("test lock"+Thread.currentThread().getName(), 10);
                            if (lockContext.isGetLock()) {
                                System.out.println(Thread.currentThread().getName() + " The lock was acquired ");
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                    lock.releaseLock(lockContext.getSession());
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                //System.out.println(Thread.currentThread().getName() + " No lock acquired ");
                                Assert.assertTrue(lockContext.isGetLock());
                            }
                        }
                    });
        }

        try {
            TimeUnit.MINUTES.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Project path:

https://github.com/wenwuxianren/consul-lock


Related articles: