Analysis of Condition Source Code for Java Concurrent Programming (Recommended)

  • 2021-07-10 19:40:04
  • OfStack

Introduction to Condition

This article is a supplement to ReentrantLock, which covered the use of locking and releasing locks on ReentrantLock in the previous article. ReentrantLock # newCondition () can create Condition, and Condition can be used to block the current thread and temporarily release the lock during the locking process of ReentrantLock, waiting for another thread to acquire the lock and notifying the blocking thread to "activate" after logic. Condition is commonly used in the implementation of synchronization mechanism based on asynchronous communication, such as the implementation of requesting and obtaining response results in dubbo.

Commonly used methods

There are two main methods in Condition

(1) The await () method can block the current thread and release the lock. (2) After acquiring the lock, you can call signal () to notify the thread blocked by await () to "activate".

Here await (), signal () must be called between ReentrantLock # lock () and ReentrantLock # unlock ().

Analysis of Condition Implementation

Condition is also implemented using AbstractQueuedSynchronizer queue. await () first adds the current thread to the waiting queue after being called, then releases the lock, and finally blocks the current thread. After being called, signal () first acquires the first node in the wait queue, and converts this node into a node in ReentrantLock and joins it at the end of the synchronous blocking queue, so that the previous node thread of this node will activate this node thread to acquire the lock after releasing the lock.

Source code analysis of await () method

The source code of await () is as follows


public final void await() throws InterruptedException {
		// Judge whether the current thread is interrupted or not and throw an interrupt exception 
      if (Thread.interrupted())
        throw new InterruptedException();
		// Join the waiting queue 
      Node node = addConditionWaiter();
		// Release the current thread lock 
      int savedState = fullyRelease(node);
      int interruptMode = 0;
		// Determines whether the synchronized blocking queue is in progress, and if not 1 Loop straight to being added 
      while (!isOnSyncQueue(node)) {
		// Blocking the current thread 
        LockSupport.park(this);
		// Determine whether it is interrupted 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
          break;
      }
		// Acquire the lock and set the interrupt state if it is interrupted during acquisition 
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
		// Clear the waiting queue by " Activate " Node of 
      if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
		// If the current thread is interrupted, handle the interrupt logic 
      if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }

It is mainly divided into the following steps

(1) First, judge whether the current thread is interrupted or not, then throw an interrupt exception. If there is no interrupt, call addConditionWaiter () to join the waiting queue (2) Calling fullyRelease (node) to release the lock enables the next node thread of the synchronous blocking queue to acquire the lock. (3) Call isOnSyncQueue (node) to judge whether the synchronous blocking queue is in place. The operation of joining the synchronous blocking queue here is joined after another thread calls signal (). If it is not in the synchronous blocking queue, it will block until it is activated. (4) If it is activated, then call checkInterruptWhileWaiting (node) to determine whether it is interrupted and obtain the interrupt mode. (5) Continue to call isOnSyncQueue (node) to determine whether the queue is blocking synchronously. (6) If yes, call acquireQueued (node, savedState) to acquire the lock. If it is not acquired, it will be blocked. The reason for not acquiring is that before the first call to isOnSyncQueue (node), another thread may have called signal () and joined the synchronous blocking queue, and then call acquireQueued (node, savedState). acquireQueued (node, savedState) also returns whether the current thread is interrupted, and if so, sets the break mode. (7) After activation, call unlinkCancelledWaiters () to clean up the active nodes waiting for the queue. (8) Finally, judge whether the current thread is interrupted, and if so, process the interrupted thread.

Let's take a look at the addConditionWaiter () implementation


private Node addConditionWaiter() {
 		// Get the tail node of the wait queue 
      Node t = lastWaiter;
      // If the tail state is not CONDITION If it has been " Activate " Clean it up, and then retrieve the tail node again 
      if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
      }
		// Create a node based on the current thread and set the node mode to CONDITION
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
		// If the tail node does not exist, the queue is empty, and the head node is set to the current node 
      if (t == null)
        firstWaiter = node;
		// If the tail node exists, set this node to the next node of the tail node 
      else
        t.nextWaiter = node;
		// Set the tail node to the current node 
      lastWaiter = node;
      return node;
    }

The logic of addConditionWaiter () is simple, that is, to create a node based on the current thread and add the node to the end of the waiting queue for other threads to process.

Let's look at the implementation of fullyRelease (Node node)


final int fullyRelease(Node node) {
    boolean failed = true;
    try {
		// Gets the lock state value of the current thread node in the blocking queue 
      int savedState = getState();
		// Release the current thread node lock 
      if (release(savedState)) {
        failed = false;
        return savedState;
      } else {
        throw new IllegalMonitorStateException();
      }
    } finally {
		// If the release fails, the node waiting state is set to off 
      if (failed)
        node.waitStatus = Node.CANCELLED;
    }
  }

Calling getState () first gets the lock state value of the current thread node in the blocking queue, which may be greater than 1 to indicate multiple reentrants, and then calls release (savedState) to release all locks and return the lock state value if the release is successful.

Let's look at the implementation of isOnSyncQueue (Node node)


final boolean isOnSyncQueue(Node node) {
		// Determining whether the current node is CONDITION Or whether the pre-node is empty or not, if it is empty, it will be returned directly false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
      return false;
		// If the next node exists, it returns in the synchronous blocking queue true
    if (node.next != null) // If has successor, it must be on queue
      return true;
		// Traverses to find if the current node is in the synchronous blocking queue 
    return findNodeFromTail(node);
  }
  private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
      if (t == node)
        return true;
      if (t == null)
        return false;
      t = t.prev;
    }
  }

The function of this method is to find whether the current node is in the synchronous blocking queue. The method first makes a quick judgment, and then makes a traversal search if it can't be judged.

(1) The first step is to judge whether the secondary node is in CONDITION state or whether the pre-node exists. If so, it indicates that false is not returned in the queue, the state 1 in the blocking queue is generally 0 or SIGNAL state, and if the current node is in the queue blocking and is not activated, the pre-node 1 is not empty. (2) Step 2 judges whether the next node of the node exists, if it exists, it indicates that the current node has joined the blocking queue. (3) If none of the above two points can be judged, it is also possible that you have just joined the synchronous blocking queue, so call findNodeFromTail (Node node) for the final traversal search. The search starts from the tail of the queue. The reason for starting from the tail is that it may have just joined the synchronous blocking queue, and it can quickly locate from the tail.

Let's look at the implementation of checkInterruptWhileWaiting (Node node)


private int checkInterruptWhileWaiting(Node node) {
      return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
    }

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
      enq(node);
      return true;
    }
    while (!isOnSyncQueue(node))
      Thread.yield();
    return false;
  }

This method is called after the thread is activated, and its main function is to judge whether the activated thread is interrupted or not. This method returns two interrupt states, THROW_IE and REINTERRUPT. THROW_IE is interrupted before calling signal (), and REINTERRUPT is interrupted after calling signal (). This method first judges whether it is marked interrupt, and then calls transferAfterCancelledWait (node) to judge which interrupt state it is. transferAfterCancelledWait (node) method is divided into two steps

(1) Using CAS to change the node state to CONDITION, and then join the synchronous blocking queue to return true (2) If you can't join the synchronous blocking queue, spin 1 and wait to join

If you use the await () method, the above two steps are actually useless, and the last one will definitely return false, because await () is activated and can only call the signal () method, and the signal () method must have added the node to the synchronization blocking queue. Therefore, the above logic is used for await (long time, TimeUnit unit) and other methods with timeout activation.

The acquireQueued (node, savedState) method has been discussed in the previous chapter, so we will not repeat it here. Let's analyze the unlinkCancelledWaiters () method below


private void unlinkCancelledWaiters() {
		// Get the wait queue head node 
      Node t = firstWaiter;
      Node trail = null;
      while (t != null) {
		// Get the next node 
        Node next = t.nextWaiter;
		// If the status is not CONDITION Indicates that the blocking queue has been added and needs to be cleaned up 
        if (t.waitStatus != Node.CONDITION) {
          t.nextWaiter = null;
          if (trail == null)
            firstWaiter = next;
          else
			// Get the next node 
            trail.nextWaiter = next;
          if (next == null)
            lastWaiter = trail;
        }
        else
          trail = t;
        t = next;
      }
    }

This method is to start from scratch to find the status of the node is not CONDITION and clean up the status of the node is not CONDITION node that the node has joined the blocking queue, no longer need to maintain.

Let's look at the reportInterruptAfterWait (interruptMode) method


private void reportInterruptAfterWait(int interruptMode)
      throws InterruptedException {
		// If it is THROW_IE Pattern throws an exception directly 
      if (interruptMode == THROW_IE)
        throw new InterruptedException();
		// If it is REINTERRUPT Mode marking thread interrupts are handled by the upper layer 
      else if (interruptMode == REINTERRUPT)
        selfInterrupt();
    }

This method handles interrupt logic. If it is THROW_IE mode, it throws an exception directly, and if it is REINTERRUPT mode, it marks the thread interrupt and handles the interrupt by the upper layer.

Source code analysis of signal () method

The source code of signal () is as follows


public final void signal() {
		// Whether the current thread holds a lock 
      if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
      Node first = firstWaiter;
		// Notice " Activate " Head node thread 
      if (first != null)
        doSignal(first);
    }

First, call isHeldExclusively () to judge whether the lock is held by the current thread, and then check whether the waiting queue is empty. If it is not empty, you can take the first node to call doSignal (first) to "activate". Here, activation is not a real activation, but only adds the node to the end of the synchronous blocking queue, so activation with "" in the context is this explanation.

Let's take a look at the isHeldExclusively () implementation


 protected final boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
    }

The implementation is to compare whether the current thread and the thread holding the lock are the same

Let's look at the implementation of doSignal (first)


private void doSignal(Node first) {
      do {
		// Head and fingers move backward 1 Bit, if the following node is empty, the tail finger also points to empty, indicating that the queue is empty 
        if ( (firstWaiter = first.nextWaiter) == null)
          lastWaiter = null;
		// Empty the next node of the header node 
        first.nextWaiter = null;
		// If " Activate " The loser takes the next one and continues until success or traversal is completed 
      } while (!transferForSignal(first) &&
           (first = firstWaiter) != null);
    }

This method is to take the current head node 1 and try to "activate" until success or traversal is completed.

Let's look at the transferForSignal (first) method


private Node addConditionWaiter() {
 		// Get the tail node of the wait queue 
      Node t = lastWaiter;
      // If the tail state is not CONDITION If it has been " Activate " Clean it up, and then retrieve the tail node again 
      if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
      }
		// Create a node based on the current thread and set the node mode to CONDITION
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
		// If the tail node does not exist, the queue is empty, and the head node is set to the current node 
      if (t == null)
        firstWaiter = node;
		// If the tail node exists, set this node to the next node of the tail node 
      else
        t.nextWaiter = node;
		// Set the tail node to the current node 
      lastWaiter = node;
      return node;
    }
0

(1) This method first sets the CONDITION state to 0, because if the CONDITION state is added to the synchronous blocking queue, it will not be recognized when it is activated.
(2) Added to the tail of the synchronous blocking queue. So if there are more than one queue in front of the synchronous blocking queue, calling unlock () will not immediately activate this node.
(3) The abnormal state directly calls unpark to activate. It stands to reason that if the abnormal state is activated, await () will handle the corresponding exception after calling unlock (), but if the await () code is not handled, it will be executed normally.

This method is mainly to add nodes to the synchronous blocking queue, and the real activation is to call unlock () to handle it.


Related articles: