Introduction to using Java thread blocking interrupts with LockSupport

  • 2020-04-01 01:25:47
  • OfStack

Last Friday and on weekends, I took a break from my work to look at the implementation of thread.interrupt and LockSupport after Java 5 in Java cocurrent.
Before I introduce you, let me throw out a few questions.
Thread.interrupt() and InterruptedException? Did interrupt trigger an InterruptedException?
Thread.interrupt() interrupts a Thread in what state? RUNNING or BLOCKING?
Does normal Thread programming require an attention to interrupt? What do you usually do? What can you do with it?
Locksupport.park () and unpark(), versus object.wait() and notify()?
Locksupport.park (Object blocker) what does the blocker Object pass for?
Can LockSupport respond to thread.interrupt ()? Would throw InterruptedException?
Is there a callback for thread.interrupt ()? Similar to hook calls?
If you can answer both questions clearly, you know thread.interrupt and don't need to read it.
Then if not clear, with these questions, together to comb.
A few ways to interrupt Thread:
Public void interrupt() : executes a thread-interrupt event
Public Boolean isInterrupted() : checks if the current thread isInterrupted
Public static Boolean interrupted() : checks if the current thread is interrupted and resets interrupt. Similar to resetAndGet ()
To understand:
1. Each thread has an interrupt status flag that indicates whether the current thread is in an interrupted state
2. There are two ways to call thread.interrupt ()
When a low-priority block state is encountered, such as object.wait(),object.sleep(), and object.join(). It immediately triggers an unblock to unblock and throws an InterruptedException.
In other cases, thread.interrupt () simply updates the status flag bit. . Then your work Thread by Thread isInterrrupted check (), you can do the corresponding processing, such as also throw InterruptedException or clean state, cancel the task and so on.
In interrupt javadoc:
< img border = 0 class = magplus title = click to view the original size picture height = 294 Alt = "" SRC =" / / files.jb51.net/file_images/article/201212/2012120314220425.png "width = 760 >  
Best practices
There's a great article on IBM. Java theory and practice: Dealing with InterruptedException, which mentions several best practices for Interrupt.
Don't swallow interrupts (Don't eat Interrupt) is usually handled in one of two ways: continue with throw InterruptedException. The other option is to go ahead and set the thread.interupt () exception bit to allow the next level of processing.
 
public class TaskRunner implements Runnable { 
private BlockingQueue<Task> queue; 
public TaskRunner(BlockingQueue<Task> queue) { 
this.queue = queue; 
} 
public void run() { 
try { 
while (true) { 
Task task = queue.take(10, TimeUnit.SECONDS); 
task.execute(); 
} 
} 
catch (InterruptedException e) { 
// Restore the interrupted status 
Thread.currentThread().interrupt(); 
} 
} 
} 

The same code at the page code block index 0
Implementing cancelable tasks with Interrupt (using thread.interrupt () to design and support tasks that can be canceled)
 
public class PrimeProducer extends Thread { 
private final BlockingQueue<BigInteger> queue; 
PrimeProducer(BlockingQueue<BigInteger> queue) { 
this.queue = queue; 
} 
public void run() { 
try { 
BigInteger p = BigInteger.ONE; 
while (!Thread.currentThread().isInterrupted()) 
queue.put(p = p.nextProbablePrime()); 
} catch (InterruptedException consumed) { 
 
} 
} 
public void cancel() { interrupt(); } //A disruption
}<SPAN style="WHITE-SPACE: normal"> </SPAN> 

The same code at the page code block index 2
Registering an event to be Interrupt
Normal task designed to deal with the cancel, are adopting an active way of polling check Thread. IsInterrupt (), the business itself has certain embeddedness, still have even if there is delay, you have to wait until the next checkpoint (who knows when is the next checkpoint, especially for a socket. Read, encountered a HttpClient timeout problems).
Let's take a look at an implementation of the exception that throws InterruptedException actively, which is a little more contrived-from the design of InterruptibleChannel.
 
interface InterruptAble { //Define interruptible interfaces
public void interrupt() throws InterruptedException; 
} 
abstract class InterruptSupport implements InterruptAble { 
private volatile boolean interrupted = false; 
private Interruptible interruptor = new Interruptible() { 
public void interrupt() { 
interrupted = true; 
InterruptSupport.this.interrupt(); //Position 3
} 
}; 
public final boolean execute() throws InterruptedException { 
try { 
blockedOn(interruptor); //Position 1
if (Thread.currentThread().isInterrupted()) { //Immediately be interrupted
interruptor.interrupt(); 
} 
//Executing business code
bussiness(); 
} finally { 
blockedOn(null); //Position 2
} 
return interrupted; 
} 
public abstract void bussiness() ; 
public abstract void interrupt(); 
// -- sun.misc.SharedSecrets -- 
static void blockedOn(Interruptible intr) { // package-private 
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr); 
} 
} 

The same code at the page code block index 4
Code description, a few clever points:
Position 1: bind the corresponding Interruptible event handling hook to the specified Thread using the blockedOn method provided by sun.
Position 2: clean the hook after executing the code. Avoid the impact on the next Thread processing event when connection pooling is used.
Position 3: defines the processing method of Interruptible event hooks, callback InterruptSupport. Enclosing interrupt () method, a subclass can be integrated to achieve their business logic, such as the sock flow closed and so on.
Use:
 
class InterruptRead extends InterruptSupport { 
private FileInputStream in; 
@Override 
public void bussiness() { 
File file = new File("/dev/urandom"); //Read the Linux black hole forever
try { 
in = new FileInputStream(file); 
byte[] bytes = new byte[1024]; 
while (in.read(bytes, 0, 1024) > 0) { 
// Thread.sleep(100); 
// if (Thread.interrupted()) {//  The previous Interrupt Check the way  
// throw new InterruptedException(""); 
// } 
} 
} catch (Exception e) { 
throw new RuntimeException(e); 
} 
} 
public FileInputStream getIn() { 
return in; 
} 
@Override 
public void interrupt() { 
try { 
in.getChannel().close(); 
} catch (IOException e) { 
e.printStackTrace(); 
} 
} 
} 
public static void main(String args[]) throws Exception { 
final InterruptRead test = new InterruptRead(); 
Thread t = new Thread() { 
@Override 
public void run() { 
long start = System.currentTimeMillis(); 
try { 
System.out.println("InterruptRead start!"); 
test.execute(); 
} catch (InterruptedException e) { 
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start)); 
e.printStackTrace(); 
} 
} 
}; 
t.start(); 
//Let Read execute for 3 seconds
Thread.sleep(3000); 
//interrupt
t.interrupt(); 
} 

The same code at the page code block index 6
JDK source code introduction:
1. The hook provided by sun can view the relevant code of the System, line: 1125
 
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){ 
public sun.reflect.ConstantPool getConstantPool(Class klass) { 
return klass.getConstantPool(); 
} 
public void setAnnotationType(Class klass, AnnotationType type) { 
klass.setAnnotationType(type); 
} 
public AnnotationType getAnnotationType(Class klass) { 
return klass.getAnnotationType(); 
} 
public <E extends Enum<E>> 
E[] getEnumConstantsShared(Class<E> klass) { 
return klass.getEnumConstantsShared(); 
} 
public void blockedOn(Thread t, Interruptible b) { 
t.blockedOn(b); 
} 
}); 

The same code at the page code block index 8
2. Thread. Interrupt ()
 
public void interrupt() { 
if (this != Thread.currentThread()) 
checkAccess(); 
synchronized (blockerLock) { 
Interruptible b = blocker; 
if (b != null) { 
interrupt0(); // Just to set the interrupt flag 
b.interrupt(); //The callback hooks
return; 
} 
} 
interrupt0(); 
} 

 
public class TaskRunner implements Runnable { 
private BlockingQueue<Task> queue; 
public TaskRunner(BlockingQueue<Task> queue) { 
this.queue = queue; 
} 
public void run() { 
try { 
while (true) { 
Task task = queue.take(10, TimeUnit.SECONDS); 
task.execute(); 
} 
} 
catch (InterruptedException e) { 
// Restore the interrupted status 
Thread.currentThread().interrupt(); 
} 
} 
} 
0
More and more
More about Thread. Stop, suspend, resume, the use of interrupt note points, can have a look at the sun's document, such as http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
Finally, to answer the previous questions:
Question 1: the relationship between thread.interrupt () and InterruptedException? Did interrupt trigger an InterruptedException?
Answer: thread.interrupt () just throws InterruptedException in object.wait ().object.join (), object.sleep (). In other blocks, however, it is common to set a flag bit information of the Thread, which needs to be processed by the program itself.
 
if (Thread.interrupted()) // Clears interrupted status! 
throw new InterruptedException(); 

The same code at the page code block index 12
Question 2: in what state does thread.interrupt () interrupt a Thread's work? RUNNING or BLOCKING?
A: thread.interrupt is designed to handle a Thread in a block state, such as wait() or sleep(). However, you can support task cancel at program design time, as well as the RUNNING state. Examples include object.join () and some nio channel designs that support interrupt.
Question3: does normal Thread programming need to focus on interrupt? What do you usually do? What can you do with it?
A: interrupt: unBlock operations, support for task cancel, data cleanup, and so on.
Question 4: locksupport.park () and unpark(), versus object.wait() and notify()?
A:
1. Different subject faces. LockSuport is mainly used for blocking Thread entry. It can specify the target object of the blocking queue and specify a specific Thread wake each time. Object.wait() blocks the current thread and wakes up a single (random) or all threads at the latitude of the Object.
2. Different implementation mechanisms. Although LockSuport can specify the monitor's object object, the blocking queues of object.wait() and object.wait() do not intersect. You can look at a test example. Object. NotifyAll () cannot wake up the blocking Thread in LockSupport.
Question 5: what does the blocker Object passed by locksupport.park (Object blocker) do?
Answer: the corresponding blcoker is recorded in a parkBlocker property of the Thread, and the jstack command makes it easy to monitor specific blocking objects.
 
public static void park(Object blocker) { 
Thread t = Thread.currentThread(); 
setBlocker(t, blocker); //Sets the value of the thread.parkblocker property
unsafe.park(false, 0L); 
setBlocker(t, null); //Clears the value of the thread.parkblocker attribute
} 

The same code at the page code block index 14
The specific description of LockSupport javadoc is also clear, you can see:
< img border = 0 class = magplus title = click to view the original size picture height = 285 Alt = "" SRC =" / / files.jb51.net/file_images/article/201212/2012120314220429.png "width = 760 >  
Question 6: can LockSupport respond to thread.interrupt ()? Would throw InterruptedException?
A: it can respond to interrupt without throwing an InterruptedException. For LockSupport's support for thread.interrupte, also take a look at the description in javadoc:
  
Related test code
 
package com.agapple.cocurrent; 
import java.io.File; 
import java.io.FileInputStream; 
import java.lang.reflect.Field; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.LockSupport; 
public class LockSupportTest { 
private static LockSupportTest blocker = new LockSupportTest(); 
public static void main(String args[]) throws Exception { 
lockSupportTest(); 
parkTest(); 
interruptParkTest(); 
interruptSleepTest(); 
interruptWaitTest(); 
} 
 
private static void lockSupportTest() throws Exception { 
Thread t = doTest(new TestCallBack() { 
@Override 
public void callback() throws Exception { 
//Try to sleep 5 s
System.out.println("blocker"); 
LockSupport.park(blocker); 
System.out.println("wakeup now!"); 
} 
@Override 
public String getName() { 
return "lockSupportTest"; 
} 
}); 
t.start(); //Start the read thread
Thread.sleep(150); 
synchronized (blocker) { 
Field field = Thread.class.getDeclaredField("parkBlocker"); 
field.setAccessible(true); 
Object fBlocker = field.get(t); 
System.out.println(blocker == fBlocker); 
Thread.sleep(100); 
System.out.println("notifyAll"); 
blocker.notifyAll(); 
} 
} 
 
private static void interruptWaitTest() throws InterruptedException { 
final Object obj = new Object(); 
Thread t = doTest(new TestCallBack() { 
@Override 
public void callback() throws Exception { 
//Try to sleep 5 s
obj.wait(); 
System.out.println("wakeup now!"); 
} 
@Override 
public String getName() { 
return "interruptWaitTest"; 
} 
}); 
t.start(); //Start the read thread
Thread.sleep(2000); 
t.interrupt(); //Check whether the response is interrupted while in park
} 
 
private static void interruptSleepTest() throws InterruptedException { 
Thread t = doTest(new TestCallBack() { 
@Override 
public void callback() throws Exception { 
//Try to sleep 5 s
Thread.sleep(5000); 
System.out.println("wakeup now!"); 
} 
@Override 
public String getName() { 
return "interruptSleepTest"; 
} 
}); 
t.start(); //Start the read thread
Thread.sleep(2000); 
t.interrupt(); //Check whether the response is interrupted while in park
} 
 
private static void interruptParkTest() throws InterruptedException { 
Thread t = doTest(new TestCallBack() { 
@Override 
public void callback() { 
//Try to park your own thread
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5)); 
System.out.println("wakeup now!"); 
} 
@Override 
public String getName() { 
return "interruptParkTest"; 
} 
}); 
t.start(); //Start the read thread
Thread.sleep(2000); 
t.interrupt(); //Check whether the response is interrupted while in park
} 
 
private static void parkTest() throws InterruptedException { 
Thread t = doTest(new TestCallBack() { 
@Override 
public void callback() { 
//Try to park your own thread
LockSupport.park(blocker); 
System.out.println("wakeup now!"); 
} 
@Override 
public String getName() { 
return "parkTest"; 
} 
}); 
t.start(); //Start the read thread
Thread.sleep(2000); 
LockSupport.unpark(t); 
t.interrupt(); 
} 
public static Thread doTest(final TestCallBack call) { 
return new Thread() { 
@Override 
public void run() { 
File file = new File("/dev/urandom"); //Read Linux black hole
try { 
FileInputStream in = new FileInputStream(file); 
byte[] bytes = new byte[1024]; 
while (in.read(bytes, 0, 1024) > 0) { 
if (Thread.interrupted()) { 
throw new InterruptedException(""); 
} 
System.out.println(bytes[0]); 
Thread.sleep(100); 
long start = System.currentTimeMillis(); 
call.callback(); 
System.out.println(call.getName() + " callback finish cost : " 
+ (System.currentTimeMillis() - start)); 
} 
} catch (Exception e) { 
e.printStackTrace(); 
} 
} 
}; 
} 
} 
interface TestCallBack { 
public void callback() throws Exception; 
public String getName(); 
} 

The same code at the page code block index 16
The last
After all, what is described in the article is only some things of use level, and some mechanisms of Thread are not introduced from the operating system or sun native implementation, so those who are familiar with this part can also post their opinions.

Related articles: