Summary of Common Blocking Queues in Java
- 2021-09-11 20:06:50
- OfStack
Java blocking queue
The main difference between blocking queue and ordinary queue lies in blocking 2 words:
Blocking addition: When the queue is full, the thread adding elements will block, and the thread will not wake up to perform the addition operation until the queue is full Blocking deletion: When the queue element is empty, the deletion thread will block until the queue is not empty before performing the deletion operationCommon blocking queues are LinkedBlockingQueue and ArrayBlockingQueue, both of which implement the BlockingQueue interface, which defines the core methods to be implemented for blocking queues:
public interface BlockingQueue<E> extends Queue<E> {
// Add elements to the end of the queue and return successfully true Queue full throws an exception IllegalStateException
boolean add(E e);
// Add elements to the end of the queue and return successfully true When the queue is full, return false
boolean offer(E e);
// Blocking addition
void put(E e) throws InterruptedException;
// Blocking addition, including maximum wait time
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// Blocking removal of queue top element
E take() throws InterruptedException;
// Blocking removes the top element of the queue, including the maximum wait time
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// Returns the maximum number that can be added to the queue without blocking
int remainingCapacity();
// Delete if element exists and return successfully true Failure returns false
boolean remove(Object o);
// Whether to contain an element
public boolean contains(Object o);
// Removes elements in batches and adds them to the specified collection
int drainTo(Collection<? super E> c);
// Bulk removal contains maximum quantity
int drainTo(Collection<? super E> c, int maxElements);
}
In addition to the above methods, there are three methods that inherit from the Queue interface that are often used:
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
According to specific functions, methods can be divided into the following three categories:
Add element classes: add () successfully returns true, fails to throw an exception, offer () successfully returns true, fails to return false, can define the maximum waiting time, put () blocking method Delete element class: remove () returns true successfully, false fails, poll () returns removed element successfully, null is null, take () blocks method Query element class: element () returns the element successfully, otherwise an exception is thrown, peek () returns the corresponding element or nullAccording to the method type, it can be divided into blocking and non-blocking, in which put () and take () are blocking methods, offer () and poll () with maximum waiting time are also blocking methods, and the rest are non-blocking methods. The blocking queue is realized based on the above methods
ArrayBlockingQueue is based on array implementation, which meets the first-in-first-out characteristics of queues. We have a preliminary understanding through 1 code:
public class ArrayBlockingQueueTest {
ArrayBlockingQueue<TestProduct> queue = new ArrayBlockingQueue<TestProduct>(1);
public static void main(String[] args) {
ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
new Thread(test.new Product()).start();
new Thread(test.new Customer()).start();
}
class Product implements Runnable {
@Override
public void run() {
while (true) {
try {
queue.put(new TestProduct());
System.out.println(" Producers create products and wait for consumers to consume them ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Customer implements Runnable {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
queue.take();
System.out.println(" Consumers consume products waiting for producers to create them ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TestProduct {
}
}
The above code is relatively simple. In a blocking queue with a capacity of 1, the producer and the consumer are blocked in turn due to capacity constraints.
ArrayBlockingQueue is implemented based on ReentrantLock lock and Condition wait queue, so there are fair and unfair modes. In fair scenario, all blocked threads execute in blocking order, while in unfair scenario, threads in the queue compete with threads just ready to enter the queue, and whoever grabs them is the one who grabs them. Unfair locks are used by default because they are more efficient:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
As you can see from the code, ArrayBlockingQueue is implemented with one ReentrantLock lock and two Condition wait queues, and its properties are as follows:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// Array for holding data
final Object[] items;
// Remove the index of an element
int takeIndex;
// Add the index of an element
int putIndex;
// Number of elements
int count;
// Locks for concurrency control
final ReentrantLock lock;
// Is not empty, used for take() Operation
private final Condition notEmpty;
// Dissatisfaction, used for put() Operation
private final Condition notFull;
// Iterator
transient Itrs itrs = null;
}
It can be seen from the code that ArrayBlockingQueue uses the same lock, and the removed elements and added elements are recorded by array subscript, and the queue header and queue tail are represented by table. Blocking the take () and put () methods through two waiting queues, let's look directly at the source code:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
// Check whether it is empty
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Determine whether the queue is full
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
// Assign values to save data
items[putIndex] = x;
// Cyclic reuse space
if (++putIndex == items.length)
putIndex = 0;
count++;
// Awaken take Thread
notEmpty.signal();
}
It can be seen from the code that the add () method is implemented based on the offer () method. After the offer () method fails to add and returns false, the add () method throws an exception. The offer () method locks to ensure thread safety and performs queuing operations when the queue is not full by manipulating arrays and by cycling the array space. If the queue is not empty after the element is successfully added, call the signal () method to wake up the blocking thread that removed the element. Finally, let's look at the put () method:
public void put(E e) throws InterruptedException {
// Judgment is not empty
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// When the queue is full, it suspends in the waiting queue
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
As you can see from the code, when the queue is full, the current thread is suspended in the waiting queue until it wakes up to perform the add operation when the queue is full. Let's look at the delete operation:
public boolean remove(Object o) {
// Determine whether it is NULL
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// Traversing from removing the subscript to adding the subscript of the new element
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
// Loop judgment, removing subscripts may be greater than adding subscripts (adding subscripts) 2 Sub-iteration duration)
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// The element to be deleted happens to be the removal subscript
if (removeIndex == takeIndex) {
items[takeIndex] = null;
// Cyclic deletion
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
// If the subscript is not removed, all elements move left from the subscript to the subscript added 1 Bit
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
// Remove to the left
items[i] = items[next];
i = next;
} else {
// Finally put Subscript to null
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
// Update iterator
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
remove () is different from poll () and take () in that it deletes specified elements. Here, we need to consider the situation that the deleted element is not pointed to by the removal index. As you can see from the code, when the deleted element is not pointed to by the removal index, all elements from the deleted element subscript to the added element subscript are shifted to the left by 1 bit.
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// Wake up after removing elements put() Add Thread
notFull.signal();
return x;
}
Compared with remove () method, poll () method is much simpler, so we won't go into details here. Let's look at take ():
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// Suspend when the queue is empty
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
The take () method and the put () method can be said to be basically identical and relatively simple. Finally, let's look at two query methods:
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
0
element () is implemented based on peek () method. When the queue is empty, peek () method returns null and element () throws an exception. That's all about ArrayBlockingQueue
LinkedBlockingQueue is implemented based on a linked list and has the following properties:
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
1
It can be seen from the code that the elements are encapsulated as Node nodes and stored in a unidirectional linked list, where the default length of the linked list is Integer.MAX_VALUE, so pay attention to memory overflow when using it: when the speed of adding elements is faster than that of deleting elements, the queue will eventually record a large number of unused and unrecyclable objects, resulting in memory overflow.
The main difference between ArrayBlockingQueue and LinkedBlockingQueue lies in the number of locks and waiting queues in ReentrantLock. LinkedBlockingQueue uses two locks and two waiting queues, which means that the addition and deletion operations can be executed concurrently, and the overall efficiency is higher. Let's look directly at the code:
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
2
Here are the following points that need our attention:
1. LinkedBlockingQueue count attribute must be encapsulated by concurrent class, because there may be concurrent execution of adding and deleting two threads, so synchronization should be considered
2. The main reason why you need to judge twice here is that there is no lock at the beginning of the method, and the value may change, so you need to judge twice after obtaining the lock
3. Unlike ArrayBlockingQueue, LinkedBlockingQueue will wake up the adding thread when the queue is dissatisfied. The reason for this is that the adding and deleting operations in LinkedBlockingQueue use different locks, each of which only needs to manage itself and can improve throughput. ArrayBlockingQueue uses a 1-only lock, which causes the removal thread to never wake up or the addition thread to never wake up, resulting in low throughput
4. Wake up the removal thread when the queue length is 0 before adding elements, because when the queue length is 0, the removal thread must have been suspended, so wake up one removal thread at this time. Because removing threads is similar to adding threads, they will wake themselves up. And c > 0, there are only two situations: There is a removal thread running, if there is one, it will wake up recursively, without our participation, there is no removal thread running, and then there is no need for us to participate, just wait for the take (), poll () methods to be called
5. Wake up only for threads blocked by put () and take () methods, and offer () method returns directly (excluding the maximum waiting time) and does not participate in the wake-up scenario
Let's look at the implementation of the put () blocking method:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// Blocking when the queue is full
while (count.get() == capacity) {
notFull.await();
}
// Join the team
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
As you can see from the code, the only difference between the put () method and the offer () method is that it suspends itself to the wait queue through condition blocking, and the rest are basically the same. Now that we have introduced the addition operation, let's look at the removal method:
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
4
As you can see from the code, the remove () method only wakes up the creation thread when the pre-operation capacity is low, not the removal thread. And because we are not sure where to delete the element, we need to add two locks at this time to ensure data security.
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
5
One point to note is that the head node is replaced every time you leave the queue. The head node itself does not save data. head. next records the elements that need to leave the queue next time. After each queue, head. next becomes a new head node and returns and is set to null
The poll () method mirrors basically the same as the offer () method mentioned above, so I won't go into details here
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
6
The take () method is similar to the poll () method except for the addition of blocking logic. At this point, the overflow element method is introduced. Finally, let's look at the query method source code:
// Gets the queue header element, does not delete, does not throw an exception NoSuchElementException
E element();
// Gets the queue header element, does not delete, does not return null
E peek();
// Gets and removes the queue header element, and does not return nul
E poll();
7
It can be seen from the code that the head and tail nodes of head and last are null by default, and the operation starts directly from next when entering the queue, that is to say, head node does not save data.
Finally, let's look at the offer () method with the maximum wait time:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Convert time into nanoseconds
long nanos = unit.toNanos(timeout);
int c = -1;
// Acquisition lock
final ReentrantLock putLock = this.putLock;
// Gets the current queue size
final AtomicInteger count = this.count;
// Interruptible lock
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// Less than 0 Indicates that the maximum wait time has been reached
if (nanos <= 0)
return false;
// If the queue is full, block the wait according to the wait queue
nanos = notFull.awaitNanos(nanos);
}
// Enter the queue directly when the queue is not full
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// Encapsulates the current thread as AQS Node Class joins the wait queue
Node node = addConditionWaiter();
// Release lock
int savedState = fullyRelease(node);
// Calculate the expiration time
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// When the current thread does not wake up to enter the synchronization queue
while (!isOnSyncQueue(node)) {
// Has waited for the appropriate time, deletes the current node, and sets the status to Off Delete from Queue
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// Determine whether to time out
if (nanosTimeout >= spinForTimeoutThreshold)
// Suspend Thread
LockSupport.parkNanos(this, nanosTimeout);
// Determine whether the thread state is interrupted
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// Recalculate the remaining wait time
nanosTimeout = deadline - System.nanoTime();
}
// After being awakened, it executes spin operation to obtain lock and judges whether the thread is interrupted or not
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// Clean up the waiting queue that is not Condition Thread of state
unlinkCancelledWaiters();
// Determine whether it is interrupted
if (interruptMode != 0)
// Throw exception or break thread, throw exception in exclusive mode, break thread in shared mode
reportInterruptAfterWait(interruptMode);
// Return time difference , If the current success time is less than the maximum wait time, the return value is greater than 0 Otherwise, the return value is less than 0
return deadline - System.nanoTime();
}
It can be seen from the code that the offer () and poll () methods with the maximum waiting time are suspended in the waiting queue by cycling to judge whether the time is timed out, and return before being awakened or executed when the maximum waiting time is reached
Comparison between ArrayBlockingQueue and LinkedBlockingQueue:
Different sizes, 1 bounded and 1 unbounded. ArrayBlockingQueue must specify initial size, LinkedBlockingQueue unbounded may overflow memory One uses array and one uses linked list. Array saving does not need to create new objects, but linked list needs to create Node objects The lock mechanism is different. The ArrayBlockingQueue add and delete operation uses the same lock, and the two operations cannot be executed concurrently. LinkedBlockingQueue add and delete use different locks, add and delete operations can be executed concurrently, and the overall efficiency of LinkedBlockingQueue is higher