Java Multithreaded CompletionService
- 2021-12-04 18:49:57
- OfStack
1 Introduction to CompletionService
CompletionService
Used to submit 1 group
Callable
Task whose take method returns 1 completed
Callable
Task corresponding
Future
Object.
If you ask
Executor
A batch task was submitted, and you want to get results after they are completed. To do this, you can put the
Future
Save into a collection, and then loop through this collection to call
Future
Adj.
get()
Take out the data. Luckily
CompletionService
Helped you do this.
CompletionService
Integrate
Executor
And
Callable
1
Function of. You can put
Callable
The task is submitted to it for execution, and then the result is obtained when the result is fully available, using methods similar to take and poll in the queue, like a packaged
Future
.
CompletionService
Object returned by take of
Callable
5
Which one is finished first is returned first, not according to the submission order.
2 CompletionService source code analysis
First, look at the construction method under 1:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
The constructor method mainly initializes a blocking queue to store the completed
Callable
6
Mission.
Then look at it for 1 time
Callable
7
Methods:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
As you can see,
Callable
8
Tasks are packaged into
Callable
9
, and
Callable
9
Yes
Callable
1
The subclass of, so finally executes the subclass of
Callable
1
In
Callable
3
Method.
Look at this method under 1:
public void run() {
// Judge the execution status and ensure that callable Tasks are only run 1 Times
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// Here we call back the callable Object in the call Method
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// Process execution results
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
You can see that in the
Callable
4
Execute in
Callable
5
Method, eventually calling back the custom
Callable
8
In
Callable
7
Method, after execution,
Pass
Callable
8
Process execution results:
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
Follow up
Callable
9
Method, where you find the
Future
0
Methods:
Future
1
You can see that this method only does one thing, that is, the execution ends
Callable
6
Added to the queue, as long as there are elements in the queue, we call
take()
Method, you can get the result of execution.
It is clear here that the implementation principle of asynchronous non-blocking retrieval of execution results is actually realized through queues.
Callable
1
Put the execution results into the queue, first in, first out, and the order in which the threads finish execution is the order in which the results are obtained.
CompletionService
In fact, it can be regarded as
Executor
And
Callable
1
The combination of.
CompletionService
When the task to be executed is received, the
Callable
1
put and take of get the results of task execution.
CompletionService
The 1 implementation of is
ExecutorCompletionService
,
ExecutorCompletionService
Give the specific calculation task to
Executor
Done.
In implementation,
ExecutorCompletionService
A is created in the constructor
Callable
1
(The linked list-based unbounded queue LinkedBlockingQueue used), the
Callable
1
The function of is to preserve
Executor
Results of execution. When the calculation is complete, call the
Callable
1
done method of. When 1 task is submitted to
ExecutorCompletionService
First, wrap the task as
Callable
9
, it is
Callable
1
1 subclass of, and then overwrite
Callable
1
The done method of, and then the
Executor
The result of the calculation performed is put into the
Callable
1
Medium.
Callable
9
The source code of is as follows:
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
3 CompletionService Implementation Task
public class CompletionServiceTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
for (int i = 1; i <=10; i++) {
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
threadPool.shutdown();
for (int i = 0; i < 10; i++) {
try {
System.out.println(
completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
7
3
9
8
1
2
4
6
5
10
4 CompletionService Summary
Compared to
Future
6
,
CompletionService
The execution of asynchronous tasks can be completed more accurately and simply
CompletionService
The 1 implementation of is
ExecutorCompletionService
, it is
Executor
And
Callable
1
A fusion of functions,
Executor
Complete the calculation task,
Callable
1
Be responsible for saving the execution results of asynchronous tasks
When performing a large number of independent and isomorphic tasks, you can use the
CompletionService
CompletionService
You can set a time limit for the execution of a task, mainly by
Callable
1
Adj.
Future
7
(long time, TimeUnit unit) is the time limit for obtaining the task execution result, and if the task is not completed, the task will be cancelled