Java Multithreaded CompletionService

  • 2021-12-04 18:49:57
  • OfStack

Directory 1 CompletionService Introduction 2 CompletionService Source Analysis 3 CompletionService Implementation Task 4 CompletionService Summary

1 Introduction to CompletionService

CompletionService Used to submit 1 group Callable Task whose take method returns 1 completed Callable Task corresponding Future Object.
If you ask Executor A batch task was submitted, and you want to get results after they are completed. To do this, you can put the Future Save into a collection, and then loop through this collection to call Future Adj. get() Take out the data. Luckily CompletionService Helped you do this.
CompletionService Integrate Executor And Callable1 Function of. You can put Callable The task is submitted to it for execution, and then the result is obtained when the result is fully available, using methods similar to take and poll in the queue, like a packaged Future .
CompletionService Object returned by take of Callable5 Which one is finished first is returned first, not according to the submission order.

2 CompletionService source code analysis

First, look at the construction method under 1:


   public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

The constructor method mainly initializes a blocking queue to store the completed Callable6 Mission.

Then look at it for 1 time Callable7 Methods:


    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

As you can see, Callable8 Tasks are packaged into Callable9 , and Callable9 Yes Callable1 The subclass of, so finally executes the subclass of Callable1 In Callable3 Method.

Look at this method under 1:


 public void run() {
 // Judge the execution status and ensure that callable Tasks are only run 1 Times 
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            // Here we call back the callable Object in the call Method 
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
            // Process execution results 
                set(result);
        }
    } finally {
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

You can see that in the Callable4 Execute in Callable5 Method, eventually calling back the custom Callable8 In Callable7 Method, after execution,

Pass Callable8 Process execution results:


    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

Follow up Callable9 Method, where you find the Future0 Methods:

Future1

You can see that this method only does one thing, that is, the execution ends Callable6 Added to the queue, as long as there are elements in the queue, we call take() Method, you can get the result of execution.
It is clear here that the implementation principle of asynchronous non-blocking retrieval of execution results is actually realized through queues. Callable1 Put the execution results into the queue, first in, first out, and the order in which the threads finish execution is the order in which the results are obtained.

CompletionService In fact, it can be regarded as Executor And Callable1 The combination of. CompletionService When the task to be executed is received, the Callable1 put and take of get the results of task execution. CompletionService The 1 implementation of is ExecutorCompletionService , ExecutorCompletionService Give the specific calculation task to Executor Done.

In implementation, ExecutorCompletionService A is created in the constructor Callable1 (The linked list-based unbounded queue LinkedBlockingQueue used), the Callable1 The function of is to preserve Executor Results of execution. When the calculation is complete, call the Callable1 done method of. When 1 task is submitted to ExecutorCompletionService First, wrap the task as Callable9 , it is Callable1 1 subclass of, and then overwrite Callable1 The done method of, and then the Executor The result of the calculation performed is put into the Callable1 Medium.

Callable9 The source code of is as follows:


    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

3 CompletionService Implementation Task


public class CompletionServiceTest {
    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
        for (int i = 1; i <=10; i++) {
            final int seq = i;
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {

                    Thread.sleep(new Random().nextInt(5000));

                    return seq;
                }
            });
        }
        threadPool.shutdown();
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(
                        completionService.take().get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

    }
}

7
3
9
8
1
2
4
6
5
10

4 CompletionService Summary

Compared to Future6 , CompletionService The execution of asynchronous tasks can be completed more accurately and simply
CompletionService The 1 implementation of is ExecutorCompletionService , it is Executor And Callable1 A fusion of functions, Executor Complete the calculation task, Callable1 Be responsible for saving the execution results of asynchronous tasks
When performing a large number of independent and isomorphic tasks, you can use the CompletionService
CompletionService You can set a time limit for the execution of a task, mainly by Callable1 Adj. Future7 (long time, TimeUnit unit) is the time limit for obtaining the task execution result, and if the task is not completed, the task will be cancelled


Related articles: