Java Custom Thread Pool and Thread Total Control Actions

  • 2021-08-31 07:49:27
  • OfStack

1 Overview

Pooling is a common idea, and thread pool is a very typical implementation of pooling. The actual combat of Java concurrent programming also explains the thread pool in Java in a large space. This paper implements a simple thread pool.

2 core classes

"1" Interface Definition


public interface IThreadPool<Job extends Runnable> {
 /**
 *  Close the thread pool 
 */
 public void shutAlldown();
 
 /**
 *  Perform a task 
 * 
 * @param job  Mission 
 */
 public void execute(Job job);
 
 /**
 *  Add Worker 
 * 
 * @param addNum  Number of additions 
 */
 public void addWorkers(int addNum);
 
 /**
 *  Reduce workers 
 * 
 * @param reduceNum  Decrease in number 
 */
 public void reduceWorkers(int reduceNum);
}

"2" Implementation class

The core of thread pool is to maintain a task list and a worker list.


import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List; 
public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> { 
 //  Default number of threads 
 private static int DEAFAULT_SIZE = 5;
 //  Maximum number of threads 
 private static int MAX_SIZE = 10; 
 //  Task list 
 private LinkedList<Job> tasks = new LinkedList<Job>();
 //  Worker thread list 
 private List<Worker> workers = Collections
  .synchronizedList(new ArrayList<Worker>()); 
 /**
 *  Default constructor 
 */
 public XYThreadPool() {
 initWokers(DEAFAULT_SIZE);
 } 
 /**
 *  Number of threads of execution 
 * 
 * @param threadNums  Number of threads 
 */
 public XYThreadPool(int workerNum) {
 workerNum = workerNum <= 0 ? DEAFAULT_SIZE
  : workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
 initWokers(workerNum);
 } 
 /**
 *  Initialize thread pool 
 * 
 * @param threadNums  Number of threads 
 */
 public void initWokers(int threadNums) {
 for (int i = 0; i < threadNums; i++) {
  Worker worker = new Worker();
  worker.start();
  workers.add(worker);
 }
 //  Add a closing hook 
 Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
  shutAlldown();
  }
 });
 } 
 @Override
 public void shutAlldown() {
 for (Worker worker : workers) {
  worker.shutdown();
 }
 } 
 @Override
 public void execute(Job job) {
 synchronized (tasks) {
  //  To submit a task is to add the task object to the task queue and wait for the worker thread to process it 
  tasks.addLast(job);
  tasks.notifyAll();
 }
 } 
 @Override
 public void addWorkers(int addNum) {
 //  The number of new threads must be greater than zero, and the total number of threads cannot be greater than the maximum number of threads 
 if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
  initWokers(addNum);
 } else {
  System.out.println("addNum too large");
 }
 } 
 @Override
 public void reduceWorkers(int reduceNum) {
 if ((workers.size() - reduceNum <= 0))
  System.out.println("thread num too small");
 else {
  //  Suspend a specified number of workers 
  int count = 0;
  while (count != reduceNum) {
  for (Worker w : workers) {
   w.shutdown();
   count++;
  }
  }
 }
 } 
 /**
 *  Worker thread 
 */
 class Worker extends Thread { 
 private volatile boolean flag = true; 
 @Override
 public void run() {
  while (flag) {
  Job job = null;
  //  Lock (if only 1 A woker There is no need to lock, that is, the so-called single-threaded thread pool, thread safe) 
  synchronized (tasks) {
   //  Task queue is empty 
   while (tasks.isEmpty()) {
   try {
    //  Blocking, abandoning object locks, waiting to be notify Awaken 
    tasks.wait();
    System.out.println("block when tasks is empty");
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   }
   //  Do not take out tasks for nulls 
   job = tasks.removeFirst();
   System.out.println("get job:" + job + ",do biz");
   job.run();
  }
  }
 } 
 public void shutdown() {
  flag = false;
 }
 }
}

(1) When calling wait () method, the thread will give up the object lock and enter the waiting lock pool waiting for this object. Only after calling notify () method for this object will the thread enter the object lock pool preparation

(2) Method for Object: void notify (): Wake up a thread waiting for the object. void notifyAll (): Wakes up all threads waiting for the object.

notifyAll causes all threads that were waiting on the object to be notify to exit the wait state and become waiting for the lock on the object. Once the object is unlocked, they will compete.

notify simply selects one wait status thread for notification, And make it acquire the lock on the object, Without alarming other threads who are also waiting to be notify by this object, When the first thread releases the lock on the object after running, if the object does not use notify statement again at this time, even if the object is idle, other threads waiting for wait state will continue to be in wait state because they have not been notified by the object until the object issues an notify or notifyAll, which are waiting for notify or notifyAll instead of locking.

3 No need to control the total number of threads

Every call creates a thread pool with 10 thread workers.


public class TestService1 {
 public static void main(String[] args) {
 //  Start 10 Threads 
 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
 pool.execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====1 test====");
  }
 }); 
 }
} 
public class TestService2 {
 public static void main(String[] args) {
 //  Start 10 Threads 
 XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
 pool.execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====2 test====");
  }
 });
 }
}

4 Total number of control threads

All thread invocations in a project generally share a thread pool with a fixed number of workers.


import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool; 
/**
 *  Unified 1 Thread pool management class  
 */
@Component
public class XYThreadManager { 
 private XYThreadPool<Runnable> executorPool; 
 @PostConstruct
 public void init() {
 executorPool = new XYThreadPool<Runnable>(10);
 } 
 public XYThreadPool<Runnable> getExecutorPool() {
 return executorPool;
 }
} 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; 
@Service("testService3")
public class TestService3 { 
 @Autowired
 private XYThreadManager threadManager; 
 public void test() {
 threadManager.getExecutorPool().execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====3 test====");
  }
 });
 }
} 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; 
@Service("testService4")
public class TestService4 { 
 @Autowired
 private XYThreadManager threadManager; 
 public void test() {
 threadManager.getExecutorPool().execute(new Runnable() {
  @Override
  public void run() {
  System.out.println("====4 test====");
  }
 });
 }
} 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext; 
public class TestMain { 
 @SuppressWarnings("resource")
 public static void main(String[] args) {
 ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml"); 
 TestService3 t3 = (TestService3) atc.getBean("testService3");
 t3.test(); 
 TestService4 t4 = (TestService4) atc.getBean("testService4");
 t4.test();
 } 
}

Add: On How to Elegantly Customize ThreadPoolExecutor Thread Pool

Preface

Thread pools must have been used by everyone, and Executors of JDK also comes with one thread pool. But I don't know if you have ever thought about how to use the thread pool in the most elegant way. How is it reasonable for a production environment to configure its own thread pool?

This weekend, I just have time to sum up what I think is "elegant". If you have any questions, please correct me.

Thread pool usage rules

To use thread pools well, 1 must follow several rules:

Setting the number of threads

Thread pool related parameter configuration

Embedding your behavior with Hook

Closure of thread pool

Thread pool configuration correlation

Setting the Thread Pool Size

This is actually an interview test center. Many interviewers will ask you the size of thread pool coreSize to examine your understanding of thread pool.

First of all, aiming at this problem, we must make clear whether our requirements are compute-intensive or IO-intensive. Only by understanding this point can we better limit the number of thread pools.

1. Computational intensive:

As the name implies, the application needs a lot of CPU computing resources. In the era of multi-core CPU, we should let every CPU core participate in computing and make full use of the performance of CPU, so that there is no waste of server configuration. If single-threaded programs are still running on very good server configuration, it will be a great waste. For computation-intensive applications, it depends entirely on the cores of CPU, so in order to give full play to its advantages and avoid excessive thread context switching, the ideal scheme is:

Number of threads = CPU cores +1, or it can be set to CPU cores * 2, but it depends on the version of JDK and CPU configuration (the server's CPU has hyper-threading).

1. Set CPU * 2.

2. IO intensive

Most of the development we are doing now is WEB application, which involves a lot of network transmission. Moreover, the interaction with database and cache also involves IO. Once IO occurs, the thread will be in a waiting state. When IO ends and the data is ready, the thread will continue to execute. Therefore, it can be found from here that for IO-intensive applications, we can set more threads in the thread pool, so that threads can do other things while waiting for IO, and improve concurrent processing efficiency. So can the amount of data in this thread pool be set casually? Of course not. Please remember that thread context switching comes at a price. At present, a set of formulas has been summarized. For IO intensive applications:

Threads = CPU cores/(1-blocking coefficient) This blocking coefficient is generally between 0.8 and 0.9, and can also be 0.8 or 0.9.

Applying the formula, the ideal number of threads for dual-core CPU is 20. Of course, this is not absolute, and needs to be adjusted according to the actual situation and actual business: final int poolSize = (int) (cpuCore/(1-0.9))

As for the blocking coefficient, "Programming Concurrency on the JVM Mastering", that is, "Java Virtual Machine Concurrent Programming", mentions a sentence:

For blocking factors, we can try to guess first, or use some fine analysis tools or java. lang. management API to determine the ratio of the time spent by threads on system/IO operations to the time spent on CPU intensive tasks.

Thread pool related parameter configuration

Speaking of this one point, we only need to keep in mind one point, 1. Never choose configuration items without upper limit.

This is why it is not recommended to use the method of creating threads in Executors.

For example, the setting of Executors. newCachedThreadPool and the setting of unbounded queue. Because of some unexpected situations, the thread pool will have system exceptions, which will lead to the sudden increase of threads or the continuous expansion of task queues, and memory exhaustion will lead to system crashes and exceptions. We recommend using a custom thread pool to avoid this problem, which is also the first principle when using the thread pool specification! Be careful not to make a big mistake, and don't be overconfident!

You can look at four methods of creating thread pools in Executors:


// Using unbounded queues 
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }
 
// The number of thread pools is unlimited 
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
  }

Others are no longer listed, and you can consult the source code by yourself.

Second, set the number of threads and the idle recovery time of threads reasonably, and set them according to the specific task execution cycle and time to avoid frequent recovery and creation. Although we use thread pool to improve system performance and throughput, we should also consider the stability of the system, otherwise unexpected problems will be very troublesome!

Third, according to the actual scene, choose the rejection strategy that is suitable for you. Make compensation, don't use the automatic compensation mechanism supported by JDK indiscriminately! Try to use the custom rejection strategy to cover the bottom!

Fourth, thread pool rejection policy, custom rejection policy can realize RejectedExecutionHandler interface.

The JDK comes with the following rejection strategy:

AbortPolicy: Throw an exception directly to prevent the system from working properly.

CallerRunsPolicy: This policy runs the currently discarded task directly in the caller thread as long as the thread pool is not closed.

DiscardOldestPolicy: Drop the oldest 1 request and try to resubmit the current task.

DiscardPolicy: Discard tasks that cannot be processed without any processing.

Using Hook

With Hook, leave the thread pool execution track:

ThreadPoolExecutor provides a hook method with protected type that can be overridden, allowing users to do something before and after a task is executed. We can use it to initialize ThreadLocal, collect statistics, such as logging and other operations. Such Hook as beforeExecute and afterExecute. There is also an Hook that lets the user insert logic when the task is finished, such as rerminated.

If the execution of the hook method fails, the execution of the internal worker thread will fail or be interrupted.

We can use beforeExecute and afterExecute to record the running situation before and after the thread, and we can also directly record the running state to ELK and other log systems.

Close the thread pool

Content When the thread pool is no longer referenced and the number of worker threads is 0, the thread pool will be terminated. We can also call shutdown to manually terminate the thread pool. If we forget to call shutdown, we can also use keepAliveTime and allowCoreThreadTimeOut to get thread resources released!

Of course, it is safe to use the virtual machine Runtime. getRuntime (). addShutdownHook method to manually call the thread pool shutdown method!

Thread pool usage instance

Thread pool core code:


public class AsyncProcessQueue { 
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 /**
 * Task  Packaging class <br>
 *  The meaning of this type is that records may be  Executor  Abnormality of eating <br>
 */
 public static class TaskWrapper implements Runnable {
 private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class); 
 private final Runnable gift; 
 public TaskWrapper(final Runnable target) {
  this.gift = target;
 } 
 @Override
 public void run() {
 
  //  Catch exceptions and avoid  Executor  The inside was swallowed up 
  if (gift != null) {
 
  try {
   gift.run();
  } catch (Exception e) {
   _LOGGER.error("Wrapped target execute exception.", e);
  }
  }
 }
 } 
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 /**
 *  Perform the specified task 
 * 
 * @param task
 * @return
 */
 public static boolean execute(final Runnable task) {
 return AsyncProcessor.executeTask(new TaskWrapper(task));
 }
}
public class AsyncProcessor {
 static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);
 
 /**
 *  Default maximum concurrency <br>
 */
 private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
 
 /**
 *  Thread pool name format 
 */
 private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";
 
 /**
 *  Thread factory name 
 */
 private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
  .daemon(true).build();
 
 /**
 *  Default queue size 
 */
 private static final int DEFAULT_SIZE = 500;
 
 /**
 *  Default thread lifetime 
 */
 private static final long DEFAULT_KEEP_ALIVE = 60L;
 
 /**NewEntryServiceImpl.java:689
 * Executor
 */
 private static ExecutorService executor;
 
 /**
 *  Execution queue 
 */
 private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); 
 static {
 //  Create  Executor
 //  The default maximum value here is changed to the number of processors  4  Times 
 try {
  executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
   TimeUnit.SECONDS, executeQueue, FACTORY); 
  //  Close the hook of the event 
  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
  @Override
  public void run() {
   AsyncProcessor.LOGGER.info("AsyncProcessor shutting down."); 
   executor.shutdown(); 
   try { 
   //  Wait 1 Second execution shutdown 
   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
    AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
    executor.shutdownNow();
   }
   } catch (InterruptedException e) {
   AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
   executor.shutdownNow();
   } 
   AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
  }
  }));
 } catch (Exception e) {
  LOGGER.error("AsyncProcessor init error.", e);
  throw new ExceptionInInitializerError(e);
 }
 } 
 /**
 *  This type cannot be instantiated 
 */
 private AsyncProcessor() {
 } 
 /**
 *  Perform a task, whether it is successful or not <br>
 *  In fact, it is after packaging  {@link Executer}  Method 
 * 
 * @param task
 * @return
 */
 public static boolean executeTask(Runnable task) { 
 try {
  executor.execute(task);
 } catch (RejectedExecutionException e) {
  LOGGER.error("Task executing was rejected.", e);
  return false;
 } 
 return true;
 } 
 /**
 *  Submit the task and get its execution later <br>
 *  When the submission fails, a  {@link }
 * 
 * @param task
 * @return
 */
 public static <T> Future<T> submitTask(Callable<T> task) { 
 try {
  return executor.submit(task);
 } catch (RejectedExecutionException e) {
  LOGGER.error("Task executing was rejected.", e);
  throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
 }
 }
}

Usage:


AsyncProcessQueue.execute(new Runnable() {
     @Override
     public void run() {
        //do something
    }
});

It can be flexibly changed according to its own usage scenario. I don't use beforeExecute and afterExecute and rejection strategy here.


Related articles: