Detail and create a simple instance of Java thread pool

  • 2020-06-03 06:25:41
  • OfStack

Java thread pool

Recent improvements to the concurrency features of the project have been patchy. After reading a lot of materials, I finally deepened my understanding. I'm going to summarize the principles of concurrent programming with a look at the source code.

Get ready to start with the most used thread pool and understand how the entire lifecycle of the thread pool works around creating, executing, and closing it. We'll explore atomic variables, concurrent containers, blocking queues, synchronization tools, locks, and more. The concurrency tools in java.util.concurrent are not difficult to use, but not just know how to use, we want read the fucking source code, haha. By the way, the JDK I use is 1.8.

Executor framework

Executor is a set of thread pool management frameworks with only one method in the interface, execute, to perform Runnable tasks. The ExecutorService interface extends Executor by adding thread lifecycle management, providing methods for task termination, task result return, and so on. AbstractExecutorService implements ExecutorService, providing default implementation logic for methods such as submit.

Then to today's topic ThreadPoolExecutor, which inherits AbstractExecutorService, provides a concrete implementation of the thread pool.

A constructor

Here is the most common constructor for ThreadPoolExecutor, with up to seven arguments. The specific code is not posted, just 1 some parameter check and set the statement.


public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
  }

corePoolSize is the target size of the thread pool, which is the size when the thread pool has just been created and no tasks are yet to be executed. maximumPoolSize is the maximum limit for a thread pool. keepAliveTime is the lifetime of the thread, and when the number of threads in the thread pool is greater than corePoolSize, the idle threads beyond the lifetime are reclaimed. unit needless to say, the remaining three parameters see the analysis below.

Default custom thread pool

ThreadPoolExecutor presets 1 custom thread pool created by the factory method in Executors. The creation parameters of newSingleThreadExecutor, newFixedThreadPool and newCachedThreadPool are analyzed below.

newFixedThreadPool


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }

Both corePoolSize and maximumPoolSize of newFixedThreadPool are set to a fixed number of incoming and keepAliveTim is set to 0. Once a thread pool is created, the number of threads will be fixed, suitable for situations where thread stability is required.

newSingleThreadExecutor


public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>()));
  }

newSingleThreadExecutor is the version of newFixedThreadPool with a fixed number of threads of 1, guaranteeing the serialization of tasks in the pool. Notice that FinalizableDelegatedExecutorService is returned. Take a look at the source code:


static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
      super(executor);
    }
    protected void finalize() {
      super.shutdown();
    }
  }

FinalizableDelegatedExecutorService inherits DelegatedExecutorService and only adds the operation of closing the thread pool when gc is used. Take a look at the source code of DelegatedExecutorService:


static class DelegatedExecutorService extends AbstractExecutorService {
    private final ExecutorService e;
    DelegatedExecutorService(ExecutorService executor) { e = executor; }
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
    public boolean isShutdown() { return e.isShutdown(); }
    public boolean isTerminated() { return e.isTerminated(); }
    //...
  }

The code is simple. DelegatedExecutorService wraps ExecutorService to expose only the methods of ExecutorService, so you can no longer configure thread pool parameters. Originally, the parameters created by the thread pool could be adjusted, and ThreadPoolExecutor provides the set method. The purpose of using newSingleThreadExecutor is to generate single-threaded serial thread pools, and if you can also configure the thread pool size, it is not interesting.

Executors also provides the unconfigurableExecutorService method to wrap a normal thread pool into a non-configurable thread pool. You can call this method if you don't want the thread pool to be modified by unknown descendants.

newCachedThreadPool


public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
  }

newCachedThreadPool generates a cached thread pool with a number of threads ranging from 0 to Integer.MAX_ES102en with a timeout of 1 minute. The effect of a thread pool is that if there are free threads, they are reused. If there are no free threads, a new thread is created; If the thread is idle for more than a minute, it is recycled.

newScheduledThreadPool

newScheduledThreadPool will create a pool of threads that can perform tasks on a regular basis. This is not intended to be covered in this article, but will be covered in a later article.

Waiting queue

newCachedThreadPool's thread limit is almost infinite, but system resources are limited and there is always a chance that a task will not be processed as fast as it is committed. As a result, ThreadPoolExecutor can be provided with a blocking queue to hold Runnable tasks waiting for insufficient threads, which is BlockingQueue.

JDK provides several implementations for BlockingQueue, commonly used are:

ArrayBlockingQueue: Block queue of array structure LinkedBlockingQueue: Block queue with linked list structure PriorityBlockingQueue: Blocking queues with priority SynchronousQueue: Blocking queues that do not store elements

newFixedThreadPool and newSingleThreadExecutor use 1 unbounded LinkedBlockingQueue by default. Note that if task 1 is submitted directly but the thread pool cannot be processed in a timely manner, the waiting queue will grow unlimitedly and the system resources will always run out at a moment. Therefore, it is recommended to use bounded wait queues to avoid resource exhaustion. But solving one problem creates a new problem: what happens when the queue fills up and a new task comes along? How to deal with queue saturation is described later.

The SynchronousQueue10 used by newCachedThreadPool is interesting because the name is a queue, but it does not store elements. To put a task in the queue, another thread must receive the task, one in and one out, and nothing is stored in the queue. Therefore, SynchronousQueue is a handover mechanism, not a queue. newCachedThreadPool generates an open-ended pool of threads that can theoretically commit as many tasks as possible, and using SynchronousQueue as a waiting queue is appropriate.

Saturated strategy

When the bounded wait queue is full, a saturation strategy is used, and the saturation strategy of ThreadPoolExecutor is implemented by passing in RejectedExecutionHandler. If no is passed in for the constructor, the default defaultHandler is used.


private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
  }

AbortPolicy is the default implementation and simply throws an RejectedExecutionException exception, leaving it to the caller. In addition, there are several saturation strategies, as shown in 1:


 public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
  }

DiscardPolicy's rejectedExecution is simply an empty method that does nothing. If the queue is full, subsequent tasks are discarded.


 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
      }
    }
  }

DiscardOldestPolicy kicks out the oldest task in the waiting queue, allowing the new one to execute.


 public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
        r.run();
      }
    }
  }

The final saturation strategy is CallerRunsPolicy, which does not discard either new or old tasks, but runs the task directly in the current thread. The current thread 1 is usually the main thread. If the main thread runs the task, it may block. If you're not thinking through the whole package, use this strategy sparingly.

ThreadFactory

Whenever the thread pool needs to create a new thread, it is obtained through the thread factory. If you do not set a thread factory for ThreadPoolExecutor, the default is defaultThreadFactory:


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }
0

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }
1

When you normally print name for threads in the thread pool, you will output a name like pool-1-thread-1, which is set here. This is the default thread factory. The threads created are normal non-daemon threads. If you need to customize, you can implement ThreadFactory and pass it to ThreadPoolExecutor.

You can learn a lot about thread pool creation without looking at the code. Although creating a thread pool is usually a one-sentence task, ThreadPoolExecutor provides a flexible way to customize it.

Thank you for reading, I hope to help you, thank you for your support to this site!


Related articles: