Blocking queues in Java are described in detail

  • 2020-05-12 02:45:28
  • OfStack

Blocking queues in Java

1. What is a blocking queue?

A blocking queue (BlockingQueue) is a queue that supports two additional operations. The two additional operations are:

When the queue is empty, the thread that gets the element waits for the queue to become non-empty.
When the queue is full, the thread storing the element waits for the queue to become available.

Blocking queues are often used in scenarios where producers are threads that add elements to the queue and consumers are threads that take elements from the queue. A blocking queue is a container where the producer stores elements, and the consumer only takes elements from the container.

2. Blocking queues in Java

Seven blocking queues are provided in JDK:

ArrayBlockingQueue: a bounded blocking queue consisting of an array structure. LinkedBlockingQueue: a bounded blocking queue consisting of a linked list structure. PriorityBlockingQueue: an unbounded blocking queue that supports prioritization. DelayQueue: an unbounded blocking queue implemented using a priority queue. SynchronousQueue: a blocking queue that does not store elements. LinkedTransferQueue: an unbounded blocking queue consisting of a linked list structure. LinkedBlockingDeque: a bidirectional blocking queue consisting of a linked list structure.

ArrayBlockingQueue

ArrayBlockingQueue is a bounded blocking queue implemented with arrays. This queue sorts the elements on a first-in, first-out (FIFO) principle. By default access queue does not guarantee that the visitor fair, fair access to queue refers to the threads blocked all the producers or consumers thread, when the queue is available, can according to the order of blocking access to the queue, namely first blocking the producer thread, can be inserted into the queue first element, threads blocked consumers first, to gain element from a queue. Usually throughput is reduced to ensure fairness. We can create a fair blocking queue using the following code:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

The fairness of its access is achieved through the ReentrantLock lock.

LinkedBlockingQueue

LinkedBlockingQueue is a bounded blocking queue implemented with linked lists. The default and maximum length of this queue is Integer.MAX_VALUE. This queue sorts the elements on a first-in, first-out basis.

PriorityBlockingQueue

PriorityBlockingQueue is an unbounded queue that supports priority. By default, the elements are arranged in a natural order, or you can specify the collation of the elements through the comparator comparator. The elements are arranged in ascending order.

DelayQueue

DelayQueue is an unbounded blocking queue that supports delayed retrieval of elements. Queues are implemented using PriorityQueue. The elements in the queue must implement the Delayed interface, and you can specify when you create the element how long it will take to get the current element from the queue. Elements can only be extracted from the queue when the delay expires. We can apply DelayQueue in the following application scenarios:

Design of cache system: the validity of cache elements can be saved with DelayQueue, and the validity of cache elements can be retrieved from DelayQueue by 1 thread.

Scheduled task scheduling. Use DelayQueue to save the tasks that will be performed that day and the execution time. Once you get the tasks from DelayQueue, you will start to execute them. For example, TimerQueue is implemented with DelayQueue.

How do I implement the Delayed interface

We can refer to the ScheduledFutureTask class in ScheduledThreadPoolExecutor. This class implements the Delayed interface. First of all, when the object is created, use time to record when the previous object can be used. The code is as follows:


ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();
}

Then use getDelay to query how long the current element still needs to be delayed. The code is as follows:


public long getDelay(TimeUnit unit) {
 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

It can be seen from the constructor that the unit of delay time parameter ns is nanoseconds. It is better to use nanoseconds when designing the getDelay, because you can specify any unit in getDelay, 1 denier is in nanoseconds, and the delay time is less than nanoseconds, which is troublesome. Please note when using time that getDelay returns a negative number when time is less than the current time.

Finally, we can use time to specify its order in the queue, for example, to place the longest delay at the end of the queue.


public int compareTo(Delayed other) {
      if (other == this) 
        return 0;
      if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask x = (ScheduledFutureTask)other;
        long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
    else if (sequenceNumber < x.sequenceNumber)
          return -1;
        else
          return 1;
      }
    long d = (getDelay(TimeUnit.NANOSECONDS)-other.getDelay(TimeUnit.NANOSECONDS));
      return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

How to implement delayed blocking queue

The implementation of a delayed blocking queue is simple. When a consumer fetches an element from the queue, it blocks the current thread if the element does not reach the delay time.


long delay = first.getDelay(TimeUtil.NANOSECONDS);
if(delay<=0){
 return q.poll ;// Blocking queue 
}else if(leader!=null){
  //leader said 1 A thread waiting to retrieve a message from a blocking queue 
  available.await(); // Let the thread enter the wait signal 
}else {
// when leader for null , the current thread is set to leader
Thread thisThread = Thread.currentThread();
try{
leader = thisThread;
// use awaitNanos() Method to make the current thread wait for a received signal or wait delay time 
available.awaitNanos(delay);
}finally{
 if(leader==thisThread){
   leader=null;
   }
 }
}

SynchronousQueue

SynchronousQueue is a blocking queue that does not store elements. For each put operation, you must wait for one take operation, otherwise you cannot continue to add elements. SynchronousQueue can be viewed as a passer, responsible for passing data processed by the producer thread directly to the consumer thread. The queue itself does not store any elements and is well suited for transitionality scenarios, such as data used in one thread being passed to another thread for use, and SynchronousQueue has a higher throughput than

LinkedBlockingQueue and ArrayBlockingQueue.

It supports fair access queues. By default, it is still an unfair policy mechanism

LinkedTransferQueue

LinkedTransferQueue is an unbounded blocking TransferQueue queue consisting of a linked list structure. Compared with other blocking queues, LinkedTransferQueue has more tryTransfer and transfer methods.

transfer method

If a consumer is currently waiting to receive an element (when the consumer is using the take() method or the time-limited poll() method), the transfer method can immediately transfer (transmit) the element passed in by the producer to the consumer. If no consumer is waiting to receive the element, the transfer method stores the element in the tail node of the queue and does not return until the element has been consumed by the consumer.

tryTransfer method

It is used to test whether the elements passed by the producer can be directly passed to the consumer. If no consumer is waiting to receive the element, false is returned. The difference between the transfer method and the tryTransfer method is that the tryTransfer method returns immediately regardless of whether the consumer accepts it or not. The transfer method does not return until the consumer has consumed.

For the time-limited tryTransfer(E e, long timeout, TimeUnit unit) method, it attempts to pass the element passed by the producer directly to the consumer, but waits for a specified time to return if no consumer consumes the element, returns false if no element is consumed by the timeout, and true if the element is consumed during the timeout.

LinkedBlockingDeque

LinkedBlockingDeque is a bidirectional blocking queue consisting of a linked list structure. Bidirectional queue means that you can insert and remove elements from both ends of a queue. The two-end queue has one more entry point for operation queue, so when multiple threads join the queue at the same time, the competition will be reduced by one and a half. LinkedBlockingDeque has more addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast, First, First, First, peekLast, First, First, First, First Method ending with the word Last to insert, get, or remove the last element of a double-ended queue. In addition, the insert method add is equivalent to addLast, and the remove method remove is equivalent to removeFirst. However, take method is equivalent to takeFirst, and it is not clear whether bug is Jdk or First suffixes. When initializing LinkedBlockingDeque, the capacity of the queue can be initialized to prevent it from overexpanding when it is re-expanded. In addition, bidirectional blocking queues can be used in "job stealing" mode.

Thank you for reading, I hope to help you, thank you for your support of this site!


Related articles: