Java Multithreading Synchronization Tool Class Exchanger

  • 2021-12-04 18:46:21
  • OfStack

Directory 1 Exchanger Introduction 2 Exchanger Instance exchange Wait Timeout
3 Implementation principle

1 Introduction to Exchanger

After introducing CyclicBarrier, CountDownLatch, and Semaphore respectively, we will now introduce the last one in the concurrency tool class Exchange .
Exchanger Is a tool class for inter-thread collaboration, Exchanger Used to exchange data between threads, it provides a synchronization point at which two threads can exchange data with each other. These two threads pass through exchange Method to exchange data if the first thread executes first exchange Method, which waits until the second thread also executes exchange Method, when both threads reach the synchronization point, the two threads can exchange data.

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

In the above description, there are several key points:

This class provides external operations synchronized; Used to exchange data between threads appearing in pairs; It can be regarded as a bidirectional synchronous queue; It can be applied to genetic algorithm, pipeline design and other scenarios. Next, looking at the api documentation, this class provides a very simple external interface, a parameterless constructor and two overloaded paradigm exchange methods:

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

2 Exchanger instance


public class ExchangerTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final Exchanger exchanger = new Exchanger();
        executor.execute(new Runnable() {
            String data = "data1";

            @Override
            public void run() {
                doExchangeWork(data, exchanger);
            }
        });

        executor.execute(new Runnable() {
            String data = "data2";

            @Override
            public void run() {
                doExchangeWork(data, exchanger);
            }
        });
        executor.shutdown();
    }

    private static void doExchangeWork(String data, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + " The data is being put  " + data + "  Exchange out ");
            Thread.sleep((long) (Math.random() * 1000));

            String exchangeData = (String) exchanger.exchange(data);
            System.out.println(Thread.currentThread().getName() + " Exchange data   " + exchangeData);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

pool-1-thread-1 is exchanging data data1
pool-1-thread-2 is exchanging data data2
pool-1-thread-2 exchange results in data data1
pool-1-thread-1 exchange results in data data2

When the thread A calls Exchange Object's exchange() Method, it will be blocked until the thread B also calls the exchange() Method, and then exchange data in a thread-safe manner, after which the threads A and B continue to run.

exchange wait timeout


public class ExchangerTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final Exchanger exchanger = new Exchanger();
        executor.execute(new Runnable() {
            String data = "data1";

            @Override
            public void run() {
                doExchangeWork(data, exchanger);
            }
        });

        executor.execute(new Runnable() {
            String data = "data2";

            @Override
            public void run() {
                try {
                    Thread.sleep((long) (3000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                doExchangeWork(data, exchanger);
            }
        });
        executor.shutdown();
    }

    private static void doExchangeWork(String data, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + " The data is being put  " + data + "  Exchange out ");

            // Far less than 3 Seconds throw exception 
            String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread().getName() + " Exchange data   " + exchangeData);
        } catch ( TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

pool-1-thread-1 is exchanging data data1
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$1.run(ExchangerTest.java:12)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2 is exchanging data data2
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$2.run(ExchangerTest.java:26)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Actual combat scene:

Design a timed task to be executed every morning. Two threads are started in the timed task, one thread is responsible for inquiring and statistic the business list (xxx_info), and putting the statistic results in the memory buffer, and the other thread is responsible for reading the statistic results in the buffer and inserting them into the business statistic table (xxx_statistics).
Dear, does this scene sound very sensible? That's right! Two threads exchange data in bulk in memory. We can use Exchanger to do this!

3 Implementation principle

Exchanger Commutator is a tool class for inter-thread collaboration. Exchanger Used for data exchange between threads. It provides a synchronization point at which two threads can exchange data with each other. These two threads pass through exchange Method to exchange data if the first thread executes first exchange Method, which waits until the second thread also executes exchange When both threads reach the synchronization point, the two threads can exchange data and pass the data produced by this thread to each other. So use Exchanger The key point is that pairs of threads use the exchange () method, and when one pair of threads reaches the synchronization point, data will be exchanged. Therefore, the thread objects of this tool class are paired.
The Exchanger class provides two methods, String exchange(V x):用 In exchange, start the exchange and wait for another thread to call exchange ; String exchange (V x, long timeout, TimeUnit unit): Used to switch, start the switch and wait for another thread to call exchange And set the maximum wait time when the wait time exceeds timeout He stopped waiting.


Related articles: