Analyze the implementation principle and application scenario of blocking queue in Java

  • 2020-04-01 04:28:54
  • OfStack

Some of the common queues we use are non-blocking queues such as PriorityQueue and LinkedList (LinkedList is a two-way list that implements the Dequeue interface).
One big problem with non-blocking queues is that they don't block the current thread, so in the face of a consumer-producer model, you have to implement an additional synchronization policy and an inter-thread wake-up policy, which can be very cumbersome to implement. But a blocking queue is different. It blocks the current thread. For example, if a thread fetches an element from an empty blocking queue, the thread will be blocked until there is an element in the blocking queue. When there is an element in the queue, the blocked thread is automatically awakened (we don't need to write code to wake up). This provides great convenience.
I. several major blocking queues

Since Java 1.5, several blocking queues have been provided under the java.util.concurrent package, mainly the following:

ArrayBlockingQueue: a blocking queue based on an array implementation that must be sized when creating an ArrayBlockingQueue object. And you can specify fairness and non-fairness, which is not fair by default, that is, the queue with the longest waiting time is not guaranteed to have the highest priority to access the queue.

LinkedBlockingQueue: a blocking queue based on a linked list implementation that defaults to an Integer.MAX_VALUE if no capacity is specified when the LinkedBlockingQueue object is created.

PriorityBlockingQueue: the above two queues are first-in, first-out queues, while PriorityBlockingQueue is not. It will sort the elements according to the priority of the elements, and the elements out of the queue are the elements with the highest priority. Note that this blocking queue is an unbounded blocking queue, meaning that there is no limit to its capacity (as you can see from the source code, it has no container full signal). The first two are bounded queues.

DelayQueue: based on PriorityQueue, a delayed blocking queue, the element in the DelayQueue can only be obtained from the queue when the specified delay time is up. The DelayQueue is also an unbounded queue, so the operation to insert data into the queue (the producer) is never blocked, and only the operation to get the data (the consumer) is blocked.

Methods in blocking queues VS methods in non-blocking queues

1. Several main methods in non-blocking queues:

Add (E E): inserts element E to the end of the queue and returns true if the insert succeeds. If the insert fails (that is, the queue is full), an exception is thrown. Remove () : removes the header element. Returns true if removed successfully. If the removal fails (the queue is empty), an exception is thrown. Offer (E) : inserts element E to the end of the queue and returns true if the insertion is successful. False if the insert fails (that is, the queue is full); Poll () : removes and gets the queue head element, returns the queue head element if successful; Otherwise return null; Peek () : gets the header element, returns the header element if successful; Otherwise return null

 

For non-blocking queues, the three methods offer, poll, and peek are generally recommended, while the add and remove methods are not. Because three methods, offer, poll, and peek, can be used to determine the success of an operation by the return value, while the add and remove methods cannot. Note that none of the methods in the non-blocking queue are synchronized.

2. Main methods of blocking queues:

The blocking queue includes most of the methods in the non-blocking queue. The five methods listed above all exist in the blocking queue, but it is important to note that all five methods are synchronized in the blocking queue. In addition, blocking queues provides four other very useful methods:

Put (E, E) Take () Offer (E,long timeout, TimeUnit unit) Poll (long timeout, TimeUnit unit)

The put method is used to store elements at the end of the queue and wait if the queue is full. The take method is used to fetch elements from the head of the queue. If the queue is empty, wait. The offer method is used to store elements at the end of the queue. If the queue is full, it waits for a certain period of time. Otherwise returns true; Poll method is used to fetch elements from the head of the queue. If the queue is empty, it will wait for a certain time. Otherwise, return the retrieved element;

The realization principle of blocking queue

If the queue is empty, the consumer will wait forever. How does the consumer know there is an element in the current queue when the producer adds an element? If you had to design the blocking queue how would you design it so that producers and consumers could communicate efficiently? Let's first take a look at how the JDK is implemented.

Implemented using the notification pattern. The notification pattern is that when a producer adds an element to a full queue, the producer is blocked, and when a consumer consumes an element in a queue, the producer is notified that the current queue is available. By looking at the JDK source code, we found that ArrayBlockingQueue was implemented using Condition. The code is as follows:


private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {
    //Omit other code
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
  }

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (count == items.length)
        notFull.await();
      insert(e);
    } finally {
      lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      while (count == 0)
        notEmpty.await();
      return extract();
 } finally {
      lock.unlock();
    }
}

private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
  }

When we insert an element into the queue, if the queue is not available, the blocking producer is mainly through locksupport.park (this); To implement the


public final void await() throws InterruptedException {
      if (Thread.interrupted())
        throw new InterruptedException();
      Node node = addConditionWaiter();
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
          break;
      }
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
      if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
      if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);
    }

Continuing into the source code, discover that calling setBlocker saves the thread that is about to be blocked, and then calls unsafe. Park to block the current thread.


public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
  }

Unsafe. Park is a native method with the following code:


public native void park(boolean isAbsolute, long time);

The park method blocks the current thread and returns only if one of the four conditions occurs.

When the unpark corresponding to park is executed or has been executed. Note: executed is the park that unpark executes first and then executes later.
When a thread is interrupted.
If the time in the parameter is not zero, wait for the specified number of milliseconds.
When something unusual happens. These exceptions cannot be identified in advance.
Let's go on to see how the JVM implements the park method, which is implemented in different ways on different operating systems, and the system method pthread_cond_wait on Linux. Implementation code in the JVM source path SRC/OS/Linux /vm/os_linux.cpp OS ::PlatformEvent::park method, the code is as follows:


void os::PlatformEvent::park() {   
      int v ;
   for (;;) {
 v = _Event ;
   if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
   }
   guarantee (v >= 0, "invariant") ;
   if (v == 0) {
   // Do this the hard way by blocking ...
   int status = pthread_mutex_lock(_mutex);
   assert_status(status == 0, status, "mutex_lock");
   guarantee (_nParked == 0, "invariant") ;
   ++ _nParked ;
   while (_Event < 0) {
   status = pthread_cond_wait(_cond, _mutex);
   // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
   // Treat this the same as if the wait was interrupted
   if (status == ETIME) { status = EINTR; }
   assert_status(status == 0 || status == EINTR, status, "cond_wait");
   }
   -- _nParked ;
   
   // In theory we could move the ST of 0 into _Event past the unlock(),
   // but then we'd need a MEMBAR after the ST.
   _Event = 0 ;
   status = pthread_mutex_unlock(_mutex);
   assert_status(status == 0, status, "mutex_unlock");
   }
   guarantee (_Event >= 0, "invariant") ;
   }

   }

Pthread_cond_wait is a multithreaded conditional variable function. Cond is short for condition, which literally means that the thread is waiting for a condition to occur, which is a global variable. This method takes two arguments, a Shared variable _cond and a mutex. The unpark method is implemented under Linux using pthread_cond_signal. Park is implemented using WaitForSingleObject under Windows.

When the queue is full, the producer inserts an element into the blocking queue, and the producer thread enters the WAITING (parking) state. We can see this using jstack dump blocked producer threads:


"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
  java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
    at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)

Examples and usage scenarios

Now use object.wait (), object.notify (), and non-blocking queues to implement the producer-consumer pattern:


public class Test {
  private int queueSize = 10;
  private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
   
  public static void main(String[] args) {
    Test test = new Test();
    Producer producer = test.new Producer();
    Consumer consumer = test.new Consumer();
     
    producer.start();
    consumer.start();
  }
   
  class Consumer extends Thread{
     
    @Override
    public void run() {
      consume();
    }
     
    private void consume() {
      while(true){
        synchronized (queue) {
          while(queue.size() == 0){
            try {
              System.out.println(" Queue empty, waiting for data ");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              queue.notify();
            }
          }
          queue.poll();     //Remove the header element each time
          queue.notify();
          System.out.println(" One element is taken from the queue, and the queue remains "+queue.size()+" An element ");
        }
      }
    }
  }
   
  class Producer extends Thread{
     
    @Override
    public void run() {
      produce();
    }
     
    private void produce() {
      while(true){
        synchronized (queue) {
          while(queue.size() == queueSize){
            try {
              System.out.println(" The queue is full, waiting for free space ");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              queue.notify();
            }
          }
          queue.offer(1);    //Insert one element at a time
          queue.notify();
          System.out.println(" Insert an element into the queue fetch, the remaining space of the queue: "+(queueSize-queue.size()));
        }
      }
    }
  }
}

  This is the classic producer-consumer pattern, implemented by blocking queues and object.wait () and object.notify (), which are primarily used to communicate between threads.

The details of how threads communicate with each other (the use of wait and notify) will be covered in a subsequent question chapter.

Here is the producer-consumer pattern implemented using blocking queues:


public class Test {
  private int queueSize = 10;
  private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
   
  public static void main(String[] args) {
    Test test = new Test();
    Producer producer = test.new Producer();
    Consumer consumer = test.new Consumer();
     
    producer.start();
    consumer.start();
  }
   
  class Consumer extends Thread{
     
    @Override
    public void run() {
      consume();
    }
     
    private void consume() {
      while(true){
        try {
          queue.take();
          System.out.println(" One element is taken from the queue, and the queue remains "+queue.size()+" An element ");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
   
  class Producer extends Thread{
     
    @Override
    public void run() {
      produce();
    }
     
    private void produce() {
      while(true){
        try {
          queue.put(1);
          System.out.println(" Insert an element into the queue fetch, the remaining space of the queue: "+(queueSize-queue.size()));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

  Have you noticed that it is much easier to use blocking queue code and not have to worry about synchronization and communication between threads separately?

In concurrent programming, blocking queues are generally recommended so that implementations can avoid unexpected errors in the program as much as possible.

The classic scenario for blocking queue usage is the reading and parsing of socket client data, in which the reading thread continuously puts the data into the queue, and then the parsing thread continuously fetches the data from the queue for parsing. There are other similar scenarios where blocking queues can be used as long as they conform to the producer-consumer model.


Related articles: