Analysis of DelayQueue delay queue principle in java concurrency

  • 2021-09-11 20:27:01
  • OfStack

Introduction

The DelayQueue queue is a delay queue, The elements stored in DelayQueue must realize the elements of Delayed interface. After realizing the interface, each element has an expiration time. When the queue obtains elements by take, it is necessary to judge whether the elements have expired. Only expired elements can be queued, and queues without expiration need to wait for the remaining expiration time before queuing.

Source code analysis

DelayQueue queue uses PriorityQueue priority queue to store data, which uses a priority queue with 2 forks and uses ReentrantLock lock to control thread synchronization. Because the internal elements use PriorityQueue to store data, Delayed interface realizes Comparable interface for comparison to control priority, as shown in the following code:


 public interface Delayed extends Comparable<Delayed> {
 
     /**
      * Returns the remaining delay associated with this object, in the
      * given time unit.
      *
      * @param unit the time unit
      * @return the remaining delay; zero or negative values indicate
      * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

The member variables of DelayQueue are as follows:


 //  Lock. 
 private final transient ReentrantLock lock = new ReentrantLock();
 //  Priority queue. 
 private final PriorityQueue<E> q = new PriorityQueue<E>();
 
 /**
  * Leader-Follower Variants of. 
  * Thread designated to wait for the element at the head of
  * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
//  Condition, which means notify if data is available Follower Thread, wake up the thread to process the queue contents. 
private final Condition available = lock.newCondition();

A variant of Leader-Follower mode, Used to minimize unnecessary timing waits, When 1 thread is selected as Leader, It waits for the delay to execute code logic in the past, while other threads need to wait indefinitely. Before returning from take or poll, whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field will be invalid by reset to null, the Leader thread must signal one of the Follower threads, and the awakened follwer thread is set to the new Leader thread.

offer operation


 public boolean offer(E e) {
     //  Acquire a lock 
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         //  Stores elements to the PriorityQueue In the priority queue 
         q.offer(e);
         //  If the 1 Elements are the current element, indicating that the previous queue is empty, then the Leader Set to null to notify waiting threads that they can scramble Leader It's over. 
         if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        //  Return success 
        return true;
    } finally {
        lock.unlock();
    }
}

The offer operation is preceded by the lock acquisition operation, that is, only one thread can be queued in the same time.

Gets the ReentrantLock lock object. Adding Elements to the PriorityQueue Priority Queue If the earliest expired element in the queue is itself, the queue was originally empty, so reset Leader to inform Follower thread that it can become Leader thread. Finally, unlock the operation.

put operation

The put operation is actually the offer operation called to add data. The following is the source code information:


public void put(E e) {
    offer(e);
}

take operation


 public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     //  Gets an interruptible lock. 
     lock.lockInterruptibly();
     try {
         //  Loop to get data. 
         for (;;) {
             //  Gets the earliest expired element, but does not pop up the object. 
             E first = q.peek();
            //  If the earliest expired element is empty, indicating that the queue is empty, the thread goes straight into an indefinite wait and gives up the lock. 
            if (first == null)
                //  The current thread waits indefinitely until it wakes up and gives up the lock object. 
                available.await();
            else {
                //  Gets the remaining expiration time of the earliest expired element. 
                long delay = first.getDelay(NANOSECONDS);
                //  If the remaining expiration time is less than 0 , it means it has expired, otherwise it has not expired. 
                if (delay <= )
                    //  If it has expired, get the earliest expired element directly and return. 
                    return q.poll();
                //  If the remaining expiration date is greater than 0 , will enter here. 
                //  Set the earliest expired element just obtained to null. 
                first = null; // don't retain ref while waiting
                //  If the thread is competing for Leader Thread, wait indefinitely. 
                if (leader != null)
                    //  Wait indefinitely and let go of the lock. 
                    available.await();
                else {
                    //  Gets the current thread. 
                    Thread thisThread = Thread.currentThread();
                    //  Sets the current thread to become Leader Thread. 
                    leader = thisThread;
                    try {
                        //  Wait for the remaining wait time. 
                        available.awaitNanos(delay);
                    } finally {
                        //  Will Leader Set to null . 
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //  If the queue is not empty and there is no Leader The notification waiting thread can become the Leader . 
        if (leader == null && q.peek() != null)
            //  Notifies the waiting thread. 
            available.signal();
        lock.unlock();
    }
}
When you get an element, you get the lock object first. Gets the earliest expired element, but does not pop the element from the queue. Whether the earliest expired element is empty, if it is empty, it directly makes the current thread wait indefinitely and gives up the current lock object. If the earliest expired element is not empty Gets the remaining expiration time of the earliest expired element, and returns the current element directly if it has expired If there is no expiration, that is to say, the remaining time still exists, the Leader object is obtained first. If Leader has been processed by a thread, the current thread waits indefinitely. If Leader is empty, Leader is set as the current thread first, and the current thread waits for the remaining time. Finally, set the Leader thread to null If Leader is empty and the queue has content, wake up 1 waiting queue.

poll operation

Gets the earliest expired element. If there is no expired element in the queue header, it will directly return null, otherwise, it will return expired elements.


 public E poll() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         E first = q.peek();
         //  Returns if the queue is empty or the earliest expired element of the queue has not expired null . 
         if (first == null || first.getDelay(NANOSECONDS) > 0)
             return null;
         else
            //  Queue-out operation. 
            return q.poll();
    } finally {
        lock.unlock();
    }
}

Summary

DelayQueue is an unbounded concurrent delay blocking queue. The elements in the queue must implement the Delayed interface, and the comparison method of Comparable interface is presented A variant of Leader-Follower mode, Used to minimize unnecessary timing waits, When 1 thread is selected as Leader, It waits for the delay to execute code logic in the past, while other threads need to wait indefinitely. Before returning from take or poll, whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field will be invalid by resetting to null. The Leader thread must signal one of the Follower threads, and the awakened follwer thread is set as the new Leader thread.

Related articles: