Java thread pool details

  • 2020-05-07 19:43:41
  • OfStack

System start a thread of the cost is relatively high, because it involves interaction with the operating system, the benefits of using a thread pool is to improve the performance, when the system contains a large number of concurrent threads, which may lead to a sharp decline in performance, the system and even cause JVM collapse, the maximum number of threads and thread pool parameter can control the number of concurrent threads shall not exceed the number in the system.

The 1.Executors factory class is used to generate the thread pool , which contains the following static factory methods to create the corresponding thread pool. The created thread pool is an ExecutorService object, and the submit or execute methods of the object are used to perform the Runnable or Callable tasks. The thread pool itself calls the shutdown() method to stop the thread pool when it is no longer needed. After calling the method, the thread pool will no longer allow tasks to be added, but will not die until all the added tasks have been executed.

1, newCachedThreadPool(), creates a thread pool with the function of cache. The thread created by the task (Runnable or Callable object) submitted to the thread pool will be cached in CachedThreadPool for the task to be executed later if completed.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CacheThreadPool {
  static class Task implements Runnable {
    @Override
    public void run() {
      System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
          + Thread.currentThread().getAllStackTraces().size());
    }
  }

  public static void main(String[] args) {
    ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
    
    // To add 3 Four tasks to the thread pool 
    for(int i = 0 ; i < 3; i++) {
      cacheThreadPool.execute(new Task());
    }
    
    // Etc. 3 When the threads are finished, add them again 3 Four tasks to the thread pool 
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    
    for(int i = 0 ; i < 3; i++) {
      cacheThreadPool.execute(new Task());
    }
  }

}

The implementation results are as follows:


CacheThreadPool$Task@2d312eb9 pool-1-thread-1 AllStackTraces map size: 7
CacheThreadPool$Task@59522b86 pool-1-thread-3 AllStackTraces map size: 7
CacheThreadPool$Task@73dbb89f pool-1-thread-2 AllStackTraces map size: 7
CacheThreadPool$Task@5795cedc pool-1-thread-3 AllStackTraces map size: 7
CacheThreadPool$Task@256d5600 pool-1-thread-1 AllStackTraces map size: 7
CacheThreadPool$Task@7d1c5894 pool-1-thread-2 AllStackTraces map size: 7

The thread objects in the thread pool are cached and reused when new tasks are executed. However, if there is an especially high amount of concurrency, the cache thread pool will still create many thread objects.

2, newFixedThreadPool(int nThreads) creates a pool of threads that can be reused by a specified number of threads.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
  static class Task implements Runnable {
    @Override
    public void run() {
      System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
          + Thread.currentThread().getAllStackTraces().size());
    }
  }

  public static void main(String[] args) {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

    //  To add 3 Four tasks to the thread pool 
    for (int i = 0; i < 5; i++) {
      fixedThreadPool.execute(new Task());
    }

    //  Etc. 3 When the threads are finished, add them again 3 Four tasks to the thread pool 
    try {
      Thread.sleep(3);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    for (int i = 0; i < 3; i++) {
      fixedThreadPool.execute(new Task());
    }
  }

}

Execution results:


FixedThreadPool$Task@7045c12d pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@50fa0bef pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@ccb1870 pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@7392b4e3 pool-1-thread-1 AllStackTraces map size: 7
FixedThreadPool$Task@5bdeff18 pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@7d5554e1 pool-1-thread-1 AllStackTraces map size: 7
FixedThreadPool$Task@24468092 pool-1-thread-3 AllStackTraces map size: 7
FixedThreadPool$Task@fa7b978 pool-1-thread-2 AllStackTraces map size: 7

3, newSingleThreadExecutor(),

4, newSheduledThreadPool(int corePoolSize), creates a pool of threads with a specified number of threads that can execute after a specified delay. You can also execute a thread repeatedly for a period of time until shutdown() is called to close the thread pool.

Here's an example:


import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
  static class Task implements Runnable {
    @Override
    public void run() {
      System.out.println("time " + System.currentTimeMillis() + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
          + Thread.currentThread().getAllStackTraces().size());
    }
  }

  public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
    
    scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS);
    
    scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS);
  
    try {
      Thread.sleep(30 * 1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    scheduledExecutorService.shutdown();
  }

}

The operation results are as follows:


time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6
time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6
time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7

As can be seen from the running time, the task is executed in a cycle of 5 seconds.

5, newSingleThreadScheduledExecutor() creates a one-thread-only thread pool with the same call to newScheduledThreadPool(1).

2. ForkJoinPool and ForkJoinTask

ForkJoinPool is the implementation class of ExecutorService, which supports the parallel computation of one task into several small tasks, and the calculation results of multiple small tasks are combined into the total calculation results. It has two constructors

ForkJoinPool(int parallelism) creates an ForkJoinPool with parallelism parallel threads.

ForkJoinPool(), creates ForkJoinPool with the return value of the Runtime.availableProcessors () method as the parallelism parameter.

ForkJoinTask represents one task that can be parallel and merged. It is implemented with Future < T > The abstract class of the interface, which has two abstract subclasses, representing RecuriveAction for the no-return task and RecursiveTask for the return value. You can inherit these two abstract classes to implement your own objects according to your specific requirements, and then call ForkJoinPool's submit method for execution.

The RecuriveAction example, as shown below, implements the parallel output of Numbers 0-300.


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ActionForkJoinTask {
  static class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 50;
    private int start;
    private int end;

    public PrintTask(int start, int end) {
      this.start = start;
      this.end = end;
    }

    @Override
    protected void compute() {
      if (end - start < THRESHOLD) {
        for(int i = start; i < end; i++) {
          System.out.println(Thread.currentThread().getName() + " " + i);
        }
      } else {
        int middle = (start + end) / 2;
        PrintTask left = new PrintTask(start, middle);
        PrintTask right = new PrintTask(middle, end);
        left.fork();
        right.fork();
      }
    }

  }

  public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    
    pool.submit(new PrintTask(0, 300));
    try {
      pool.awaitTermination(2, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    
    pool.shutdown();
  }

}

After the small task is broken up, the fork () method of the task is called and added to ForkJoinPool for parallel execution.

RecursiveTask example, to achieve the parallel calculation of the sum of 100 integers. Split into a sum of 20 Numbers to get the result, at the end of the final merge into the final result.


import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class TaskForkJoinTask {
  static class CalTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 20;

    private int arr[];
    private int start;
    private int end;

    public CalTask(int[] arr, int start, int end) {
      this.arr = arr;
      this.start = start;
      this.end = end;
    }

    @Override
    protected Integer compute() {
      int sum = 0;

      if (end - start < THRESHOLD) {
        for (int i = start; i < end; i++) {
          sum += arr[i];
        }
        System.out.println(Thread.currentThread().getName() + " sum:" + sum);
        return sum;
      } else {
        int middle = (start + end) / 2;
        CalTask left = new CalTask(arr, start, middle);
        CalTask right = new CalTask(arr, middle, end);

        left.fork();
        right.fork();

        return left.join() + right.join();
      }
    }

  }

  public static void main(String[] args) {
    int arr[] = new int[100];
    Random random = new Random();
    int total = 0;

    for (int i = 0; i < arr.length; i++) {
      int tmp = random.nextInt(20);
      total += (arr[i] = tmp);
    }
    System.out.println("total " + total);

    ForkJoinPool pool = new ForkJoinPool(4);

    Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
    try {
      System.out.println("cal result: " + future.get());
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
    pool.shutdown();
  }

}

The implementation results are as follows:


total 912
ForkJoinPool-1-worker-2 sum:82
ForkJoinPool-1-worker-2 sum:123
ForkJoinPool-1-worker-2 sum:144
ForkJoinPool-1-worker-3 sum:119
ForkJoinPool-1-worker-2 sum:106
ForkJoinPool-1-worker-2 sum:128
ForkJoinPool-1-worker-2 sum:121
ForkJoinPool-1-worker-3 sum:89
cal result: 912

When the subtask is finished, the join () method of the task is called to obtain the results of the subtask execution, and the final result is obtained by adding them together.


Related articles: