The Java collection framework arrayblockingqueue applies analysis

  • 2020-04-01 01:08:19
  • OfStack

The Queue
------------
1.ArrayDeque, (array double-ended queue)
2.PriorityQueue,
3.ConcurrentLinkedQueue, (concurrent queue based on linked list)
4.DelayQueue, (delay BlockingQueue) (BlockingQueue implements BlockingQueue interface)
5.ArrayBlockingQueue, (array-based concurrent blocking queue)
6.LinkedBlockingQueue (FIFO blocking queue based on linked list)
7.LinkedBlockingDeque, (FIFO double-ended blocking queue based on linked list)
8.PriorityBlockingQueue, (unbounded blocking queue with priority)
9.SynchronousQueue (concurrent synchronous blocking queue)
-----------------------------------------------------
ArrayBlockingQueue
Is a bounded blocking queue supported by arrays. This queue sorts the elements on a FIFO (first in, first out) principle. The head of the queue is the element that has been in the queue the longest. The tail of the queue is the element that has been in the queue for the shortest time. The new element is inserted at the end of the queue, and the queue fetch operation gets the element from the head of the queue.
This is a typical "bounded cache" in which a fixed-size array holds elements inserted by the producer and extracted by the consumer. Once you create such a cache, you cannot increase its capacity. Trying to put an element into a full queue causes the operation to block. Trying to extract elements from an empty queue causes a similar block.
This class supports an optional fairness policy for sorting waiting producer and consumer threads. By default, this sort is not guaranteed. However, a queue constructed by setting fairness to true allows threads to be accessed in FIFO order. Fairness typically reduces throughput, but it also reduces variability and avoids "unevenness."
 
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 
 
private final E[] items; 
 
private int takeIndex; 
 
private int putIndex; 
 
private int count; 
 
private final ReentrantLock lock; 
 
private final Condition notEmpty; 
 
private final Condition notFull; 
//Reset to 0 when the array length is exceeded
final int inc(int i) { 
return (++i == items.length)? 0 : i; 
} 
 
private void insert(E x) { 
items[putIndex] = x; 
putIndex = inc(putIndex); 
++count; 
notEmpty.signal(); 
} 
 
private E extract() { 
final E[] items = this.items; 
E x = items[takeIndex]; 
items[takeIndex] = null; 
takeIndex = inc(takeIndex);//Move to the next position
--count; 
notFull.signal(); 
return x; 
} 
 
void removeAt(int i) { 
final E[] items = this.items; 
// if removing front item, just advance 
if (i == takeIndex) { 
items[takeIndex] = null; 
takeIndex = inc(takeIndex); 
} else { 
//Move all the elements after I up to putIndex forward by one position
for (;;) { 
int nexti = inc(i); 
if (nexti != putIndex) { 
items[i] = items[nexti]; 
i = nexti; 
} else { 
items[i] = null; 
putIndex = i; 
break; 
} 
} 
} 
--count; 
notFull.signal(); 
} 
 
public ArrayBlockingQueue(int capacity) { 
this(capacity, false); 
} 
 
public ArrayBlockingQueue(int capacity, boolean fair) { 
if (capacity <= 0) 
throw new IllegalArgumentException(); 
this.items = (E[]) new Object[capacity]; 
lock = new ReentrantLock(fair); 
notEmpty = lock.newCondition(); 
notFull = lock.newCondition(); 
} 
 
public ArrayBlockingQueue(int capacity, boolean fair, 
Collection<? extends E> c) { 
this(capacity, fair); 
if (capacity < c.size()) 
throw new IllegalArgumentException(); 
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 
add(it.next()); 
} 
 
public boolean add(E e) { 
return super.add(e); 
} 
 
public boolean offer(E e) { 
if (e == null) throw new NullPointerException(); 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == items.length) 
return false; 
else { 
insert(e); 
return true; 
} 
} finally { 
lock.unlock(); 
} 
} 
 
public void put(E e) throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == items.length) 
notFull.await(); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
insert(e); 
} finally { 
lock.unlock(); 
} 
} 
 
public boolean offer(E e, long timeout, TimeUnit unit) 
throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != items.length) { 
insert(e); 
return true; 
} 
if (nanos <= 0)//Return if the time is up
return false; 
try { 
nanos = notFull.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
} 
} finally { 
lock.unlock(); 
} 
} 
//Gets and removes the head of this queue, and returns null if the queue is empty.
public E poll() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == 0) 
return null; 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 
} 
} 
//Gets and removes the head of this queue, waiting (if necessary) until the element becomes available.
public E take() throws InterruptedException { 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == 0) 
notEmpty.await(); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 
} 
} 
//Gets and removes the head of the queue, waiting for the available elements (if necessary) before the specified wait time.
public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != 0) { 
E x = extract(); 
return x; 
} 
if (nanos <= 0) 
return null; 
try { 
nanos = notEmpty.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
} 
} finally { 
lock.unlock(); 
} 
} 
//Gets but does not remove the head of the queue; If the queue is empty, null is returned.
public E peek() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return (count == 0) ? null : items[takeIndex]; 
} finally { 
lock.unlock(); 
} 
} 
 
public int size() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return count; 
} finally { 
lock.unlock(); 
} 
} 
 
public int remainingCapacity() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return items.length - count; 
} finally { 
lock.unlock(); 
} 
} 
 
public boolean remove(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
for (;;) { 
if (k++ >= count) 
return false; 
if (o.equals(items[i])) { 
removeAt(i); 
return true; 
} 
i = inc(i); 
} 
} finally { 
lock.unlock(); 
} 
} 
 
public boolean contains(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
while (k++ < count) { 
if (o.equals(items[i])) 
return true; 
i = inc(i); 
} 
return false; 
} finally { 
lock.unlock(); 
} 
} 
 ...  
} 

Related articles: