Summary of Common Blocking Queues in Java

  • 2021-09-11 20:06:50
  • OfStack

Java blocking queue

The main difference between blocking queue and ordinary queue lies in blocking 2 words:

Blocking addition: When the queue is full, the thread adding elements will block, and the thread will not wake up to perform the addition operation until the queue is full Blocking deletion: When the queue element is empty, the deletion thread will block until the queue is not empty before performing the deletion operation

Common blocking queues are LinkedBlockingQueue and ArrayBlockingQueue, both of which implement the BlockingQueue interface, which defines the core methods to be implemented for blocking queues:


public interface BlockingQueue<E> extends Queue<E> {
	//  Add elements to the end of the queue and return successfully true Queue full throws an exception  IllegalStateException
    boolean add(E e);
	//  Add elements to the end of the queue and return successfully  true When the queue is full, return  false
    boolean offer(E e);
	//  Blocking addition 
    void put(E e) throws InterruptedException;
	//  Blocking addition, including maximum wait time 
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
	//  Blocking removal of queue top element 
    E take() throws InterruptedException;
	//  Blocking removes the top element of the queue, including the maximum wait time 
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
	//  Returns the maximum number that can be added to the queue without blocking 
    int remainingCapacity();
	//  Delete if element exists and return successfully  true Failure returns  false
    boolean remove(Object o);
	//  Whether to contain an element 
    public boolean contains(Object o);
    //  Removes elements in batches and adds them to the specified collection 
    int drainTo(Collection<? super E> c);
	//  Bulk removal contains maximum quantity 
    int drainTo(Collection<? super E> c, int maxElements);
}

In addition to the above methods, there are three methods that inherit from the Queue interface that are often used:


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();

According to specific functions, methods can be divided into the following three categories:

Add element classes: add () successfully returns true, fails to throw an exception, offer () successfully returns true, fails to return false, can define the maximum waiting time, put () blocking method Delete element class: remove () returns true successfully, false fails, poll () returns removed element successfully, null is null, take () blocks method Query element class: element () returns the element successfully, otherwise an exception is thrown, peek () returns the corresponding element or null

According to the method type, it can be divided into blocking and non-blocking, in which put () and take () are blocking methods, offer () and poll () with maximum waiting time are also blocking methods, and the rest are non-blocking methods. The blocking queue is realized based on the above methods

ArrayBlockingQueue is based on array implementation, which meets the first-in-first-out characteristics of queues. We have a preliminary understanding through 1 code:


public class ArrayBlockingQueueTest {

    ArrayBlockingQueue<TestProduct> queue = new ArrayBlockingQueue<TestProduct>(1);

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        new Thread(test.new Product()).start();
        new Thread(test.new Customer()).start();
    }

    class Product implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    queue.put(new TestProduct());
                    System.out.println(" Producers create products and wait for consumers to consume them ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Customer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000);
                    queue.take();
                    System.out.println(" Consumers consume products waiting for producers to create them ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class TestProduct {
    }

}

The above code is relatively simple. In a blocking queue with a capacity of 1, the producer and the consumer are blocked in turn due to capacity constraints.

ArrayBlockingQueue is implemented based on ReentrantLock lock and Condition wait queue, so there are fair and unfair modes. In fair scenario, all blocked threads execute in blocking order, while in unfair scenario, threads in the queue compete with threads just ready to enter the queue, and whoever grabs them is the one who grabs them. Unfair locks are used by default because they are more efficient:


public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

As you can see from the code, ArrayBlockingQueue is implemented with one ReentrantLock lock and two Condition wait queues, and its properties are as follows:


public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	//  Array for holding data 
    final Object[] items;
	//  Remove the index of an element 
    int takeIndex;
	//  Add the index of an element 
    int putIndex;
	//  Number of elements 
    int count;
	//  Locks for concurrency control 
    final ReentrantLock lock;
	//  Is not empty, used for take() Operation 
    private final Condition notEmpty;
	//  Dissatisfaction, used for put() Operation 
    private final Condition notFull;
	//  Iterator 
    transient Itrs itrs = null;
}

It can be seen from the code that ArrayBlockingQueue uses the same lock, and the removed elements and added elements are recorded by array subscript, and the queue header and queue tail are represented by table. Blocking the take () and put () methods through two waiting queues, let's look directly at the source code:


public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
	//  Check whether it is empty 
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	//  Determine whether the queue is full 
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    //  Assign values to save data 
    items[putIndex] = x;
    //  Cyclic reuse space 
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //  Awaken take Thread 
    notEmpty.signal();
}

It can be seen from the code that the add () method is implemented based on the offer () method. After the offer () method fails to add and returns false, the add () method throws an exception. The offer () method locks to ensure thread safety and performs queuing operations when the queue is not full by manipulating arrays and by cycling the array space. If the queue is not empty after the element is successfully added, call the signal () method to wake up the blocking thread that removed the element. Finally, let's look at the put () method:


public void put(E e) throws InterruptedException {
	//  Judgment is not empty 
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		//  When the queue is full, it suspends in the waiting queue 
	    while (count == items.length)
	        notFull.await();
	    enqueue(e);
	} finally {
	    lock.unlock();
	}
}

As you can see from the code, when the queue is full, the current thread is suspended in the waiting queue until it wakes up to perform the add operation when the queue is full. Let's look at the delete operation:


public boolean remove(Object o) {
	//  Determine whether it is  NULL
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            //  Traversing from removing the subscript to adding the subscript of the new element 
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                //  Loop judgment, removing subscripts may be greater than adding subscripts (adding subscripts) 2 Sub-iteration duration) 
                if (++i == items.length)
                    i = 0; 
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
void removeAt(final int removeIndex) {
	final Object[] items = this.items;
	//  The element to be deleted happens to be the removal subscript 
	if (removeIndex == takeIndex) {
	    items[takeIndex] = null;
	    //  Cyclic deletion 
	    if (++takeIndex == items.length)
	        takeIndex = 0;
	    count--;
	    if (itrs != null)
	        itrs.elementDequeued();
	} else {
	    final int putIndex = this.putIndex;
	    //  If the subscript is not removed, all elements move left from the subscript to the subscript added 1 Bit 
	    for (int i = removeIndex;;) {
	        int next = i + 1;
	        if (next == items.length)
	            next = 0;
	        if (next != putIndex) {
	        	//  Remove to the left 
	            items[i] = items[next];
	            i = next;
	        } else {
	        	//  Finally put Subscript to null
	            items[i] = null;
	            this.putIndex = i;
	            break;
	        }
	    }
	    count--;
	    //  Update iterator 
	    if (itrs != null)
	        itrs.removedAt(removeIndex);
	}
	notFull.signal();
}

remove () is different from poll () and take () in that it deletes specified elements. Here, we need to consider the situation that the deleted element is not pointed to by the removal index. As you can see from the code, when the deleted element is not pointed to by the removal index, all elements from the deleted element subscript to the added element subscript are shifted to the left by 1 bit.


public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	    return (count == 0) ? null : dequeue();
	} finally {
	    lock.unlock();
	}
}
private E dequeue() {
	final Object[] items = this.items;
	E x = (E) items[takeIndex];
	items[takeIndex] = null;
	if (++takeIndex == items.length)
	    takeIndex = 0;
	count--;
	if (itrs != null)
	    itrs.elementDequeued();
	//  Wake up after removing elements put() Add Thread 
	notFull.signal();
	return x;
}

Compared with remove () method, poll () method is much simpler, so we won't go into details here. Let's look at take ():


public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		//  Suspend when the queue is empty 
	    while (count == 0)
	        notEmpty.await();
	    return dequeue();
	} finally {
	    lock.unlock();
	}
}

The take () method and the put () method can be said to be basically identical and relatively simple. Finally, let's look at two query methods:


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
0

element () is implemented based on peek () method. When the queue is empty, peek () method returns null and element () throws an exception. That's all about ArrayBlockingQueue

LinkedBlockingQueue is implemented based on a linked list and has the following properties:


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
1

It can be seen from the code that the elements are encapsulated as Node nodes and stored in a unidirectional linked list, where the default length of the linked list is Integer.MAX_VALUE, so pay attention to memory overflow when using it: when the speed of adding elements is faster than that of deleting elements, the queue will eventually record a large number of unused and unrecyclable objects, resulting in memory overflow.

The main difference between ArrayBlockingQueue and LinkedBlockingQueue lies in the number of locks and waiting queues in ReentrantLock. LinkedBlockingQueue uses two locks and two waiting queues, which means that the addition and deletion operations can be executed concurrently, and the overall efficiency is higher. Let's look directly at the code:


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
2

Here are the following points that need our attention:

1. LinkedBlockingQueue count attribute must be encapsulated by concurrent class, because there may be concurrent execution of adding and deleting two threads, so synchronization should be considered

2. The main reason why you need to judge twice here is that there is no lock at the beginning of the method, and the value may change, so you need to judge twice after obtaining the lock

3. Unlike ArrayBlockingQueue, LinkedBlockingQueue will wake up the adding thread when the queue is dissatisfied. The reason for this is that the adding and deleting operations in LinkedBlockingQueue use different locks, each of which only needs to manage itself and can improve throughput. ArrayBlockingQueue uses a 1-only lock, which causes the removal thread to never wake up or the addition thread to never wake up, resulting in low throughput

4. Wake up the removal thread when the queue length is 0 before adding elements, because when the queue length is 0, the removal thread must have been suspended, so wake up one removal thread at this time. Because removing threads is similar to adding threads, they will wake themselves up. And c > 0, there are only two situations: There is a removal thread running, if there is one, it will wake up recursively, without our participation, there is no removal thread running, and then there is no need for us to participate, just wait for the take (), poll () methods to be called

5. Wake up only for threads blocked by put () and take () methods, and offer () method returns directly (excluding the maximum waiting time) and does not participate in the wake-up scenario

Let's look at the implementation of the put () blocking method:


public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
    	//  Blocking when the queue is full 
        while (count.get() == capacity) {
            notFull.await();
        }
        //  Join the team 
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

As you can see from the code, the only difference between the put () method and the offer () method is that it suspends itself to the wait queue through condition blocking, and the rest are basically the same. Now that we have introduced the addition operation, let's look at the removal method:


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
4

As you can see from the code, the remove () method only wakes up the creation thread when the pre-operation capacity is low, not the removal thread. And because we are not sure where to delete the element, we need to add two locks at this time to ensure data security.


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
5

One point to note is that the head node is replaced every time you leave the queue. The head node itself does not save data. head. next records the elements that need to leave the queue next time. After each queue, head. next becomes a new head node and returns and is set to null

The poll () method mirrors basically the same as the offer () method mentioned above, so I won't go into details here


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
6

The take () method is similar to the poll () method except for the addition of blocking logic. At this point, the overflow element method is introduced. Finally, let's look at the query method source code:


//  Gets the queue header element, does not delete, does not throw an exception  NoSuchElementException
E element();
//  Gets the queue header element, does not delete, does not return  null
E peek();
//  Gets and removes the queue header element, and does not return  nul
E poll();
7

It can be seen from the code that the head and tail nodes of head and last are null by default, and the operation starts directly from next when entering the queue, that is to say, head node does not save data.

Finally, let's look at the offer () method with the maximum wait time:


public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	//  Convert time into nanoseconds 
	long nanos = unit.toNanos(timeout);
	int c = -1;
	//  Acquisition lock 
	final ReentrantLock putLock = this.putLock;
	//  Gets the current queue size 
	final AtomicInteger count = this.count;
	//  Interruptible lock 
	putLock.lockInterruptibly();
	try {
	    while (count.get() == capacity) {
	    	//  Less than 0 Indicates that the maximum wait time has been reached 
	        if (nanos <= 0)
	            return false;
	        //  If the queue is full, block the wait according to the wait queue 
	        nanos = notFull.awaitNanos(nanos);
	    }
	    //  Enter the queue directly when the queue is not full 
	    enqueue(new Node<E>(e));
	    c = count.getAndIncrement();
	    if (c + 1 < capacity)
	        notFull.signal();
	} finally { 
	    putLock.unlock();
	}
	if (c == 0)
	    signalNotEmpty();
	return true;
}
 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
	//  Encapsulates the current thread as  AQS Node  Class joins the wait queue 
    Node node = addConditionWaiter();
    //  Release lock 
    int savedState = fullyRelease(node);
    // Calculate the expiration time 
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    //  When the current thread does not wake up to enter the synchronization queue 
    while (!isOnSyncQueue(node)) {
    	//  Has waited for the appropriate time, deletes the current node, and sets the status to Off Delete from Queue 
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        //  Determine whether to time out 
        if (nanosTimeout >= spinForTimeoutThreshold)
        	//  Suspend Thread 
            LockSupport.parkNanos(this, nanosTimeout);
        //  Determine whether the thread state is interrupted 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //  Recalculate the remaining wait time 
        nanosTimeout = deadline - System.nanoTime();
    }
    //  After being awakened, it executes spin operation to obtain lock and judges whether the thread is interrupted or not 
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
    	//  Clean up the waiting queue that is not Condition Thread of state 
        unlinkCancelledWaiters();
    //  Determine whether it is interrupted 
    if (interruptMode != 0)
    	//  Throw exception or break thread, throw exception in exclusive mode, break thread in shared mode 
        reportInterruptAfterWait(interruptMode);
    //  Return time difference , If the current success time is less than the maximum wait time, the return value is greater than 0 Otherwise, the return value is less than 0
    return deadline - System.nanoTime();
}

It can be seen from the code that the offer () and poll () methods with the maximum waiting time are suspended in the waiting queue by cycling to judge whether the time is timed out, and return before being awakened or executed when the maximum waiting time is reached

Comparison between ArrayBlockingQueue and LinkedBlockingQueue:

Different sizes, 1 bounded and 1 unbounded. ArrayBlockingQueue must specify initial size, LinkedBlockingQueue unbounded may overflow memory One uses array and one uses linked list. Array saving does not need to create new objects, but linked list needs to create Node objects The lock mechanism is different. The ArrayBlockingQueue add and delete operation uses the same lock, and the two operations cannot be executed concurrently. LinkedBlockingQueue add and delete use different locks, add and delete operations can be executed concurrently, and the overall efficiency of LinkedBlockingQueue is higher

Related articles: