Resolving ThreadGroup Pits in Thread Pool

  • 2021-11-29 06:48:35
  • OfStack

Is the pit ThreadGroup of ThreadGroup in the directory thread pool feasible Executors internal class DefaultThreadFactoryThreadGroup and how to get the abnormal ThreadGroup thread pool in the Thread thread for use

Pit of ThreadGroup in thread pool

In Java, every thread belongs to one member of a thread group management. For example, if one thread is generated in the main workflow of the main function main (), the generated thread belongs to one member of the thread group management of main. Simply put, a thread group (ThreadGroup) is a class composed of threads that manages threads. This class is java. lang. ThreadGroup.

Define 1 thread group, which can be realized by the following code.


ThreadGroup group=new ThreadGroup( " groupName " );
Thread thread=new Thread(group, " the first thread of group " );

Some methods in the ThreadGroup class can act on threads in a thread group. For example, the setMaxPriority () method can set the maximum priority for all threads in a thread group.

All threads belong to one thread group. That can be a default thread group (group is not specified) or a group explicitly specified when the thread is created. At the beginning of creation, threads are limited to 1 group and cannot be changed to 1 different group. Each application has at least one thread subordinate to the system thread group. If multiple threads are created without specifying a group, they are automatically assigned to the system thread group.

Thread groups must also be subordinate to other thread groups. You must specify in the builder which thread group the new thread group belongs to. If you create a thread group without specifying its attribution, it will also automatically become a subordinate of the system thread group. Therefore, all thread groups in an application will eventually have the system thread group as their own "parent".

So if we need to implement a thread grouping with custom ThreadGroup in the thread pool, how should we do it?

When we submit tasks to the thread pool (ThreadPoolExecutor), we can add one thread task to the thread pool through execute (Runnable command), so can we add one thread task to the thread pool through new1 Thread instances that specify ThreadGroup to achieve the above purpose?

Is ThreadGroup feasible

Implementation of task grouping in thread pool through new Thread (threadGroup, runnable)


public static void main(String[] args) {
        ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        final ThreadGroup group = new ThreadGroup("Main_Test_Group");
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(group, new Runnable() {
                @Override
                public void run() {
                    int sleep = (int)(Math.random() * 10);
                    try {
                        Thread.sleep(1000 * 3);
                        System.out.println(Thread.currentThread().getName()+" Complete execution ");
                        System.out.println(" Number of running threads in the current thread group "+group.activeCount());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, group.getName()+" #"+i+"");
            pool.execute(thread);
        }
    }

Running result

pool-1-thread-3 execution complete
pool-1-thread-1 execution complete
Number of running threads in the current thread group 0
pool-1-thread-2 execution complete
Number of running threads in the current thread group 0
Number of running threads in the current thread group 0
pool-1-thread-4 execution complete
pool-1-thread-5 execution complete
Number of running threads in the current thread group 0
Number of running threads in the current thread group 0

The results show that the thread in group did not run because the thread pool started the thread task. Therefore, it is not feasible to group the thread layer tasks in the thread pool by thread groups.

You can see the following constructor from the java. util. concurrent. ThreadPoolExecutor source code:


public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

If we do not specify ThreadFactory when we instantiate ThreadPoolExecutor, Thread will be created with the default ThreadFactory.

Executors Internal Class DefaultThreadFactory

The following source code is the default Thread factory


static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

From the only 1 constructor, we can see that DefaultThreadFactory specifies group of the thread with ThreadGroup in SecurityManager instance, and if ThreadGroup obtained by SecurityManager is null, it is specified with group of the current thread by default. public Thread newThread (Runnable r) then group is new 1 Thead. In this way, we can pass in a custom ThreadFactory instance in its constructor when instantiating ThreadPoolExecutor object.


public class MyTheadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private ThreadGroup defaultGroup;
    public MyTheadFactory() {
        SecurityManager s = System.getSecurityManager();
        defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public MyTheadFactory(ThreadGroup group) {
       this.defaultGroup = group;
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

The Use of ThreadGroup and Handwriting Thread Pool

The listening thread shut down abnormally

The following code is not easy to test under window, but needs to be tested on linux


//  If the following threads are forced to close, they cannot print ` The thread was killed `
//  Simulated shutdown  kill PID
public static void main(String[] args)  {
        Runtime.getRuntime().addShutdownHook(new Thread( () -> {
            System.out.println(" The thread was killed ");
        }));
        while(true){
            System.out.println("i am working ...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

How to Get Exception in Thread Thread


public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000);
                int i = 10/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.setUncaughtExceptionHandler((t,e)->{
            System.out.println(" The name of the thread "+ t.getName());
            System.out.println(e);
        });  //  By injecting interfaces, 
        thread.start();
    }

ThreadGroup

Note: When threadGroup is set to isDaemon, it will be destroyed with the end of the last thread. If isDaemon is not set, you need to call destory manually ()

Thread pool usage

Self-built simple thread pool implementation

The application of ThreadGroup is not written, but we can check whether there are active threads in ThreadGroup after the thread is closed. Refer to ThreadGroup API for details


import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
/**
 * @Author: shengjm
 * @Date: 2020/2/10 9:52
 * @Description:
 */
public class SimpleThreadPool extends Thread{
    /**
     *  Number of threads 
     */
    private int size;
    private final int queueSize;
    /**
     *  Default number of thread queues 
     */
    private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;
    private static volatile int seq = 0;
    private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";
    private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
    private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
    private final DiscardPolicy discardPolicy;
    private volatile boolean destory = false;
    private int min;
    private int max;
    private int active;
    /**
     *  Defining the Implementation of Exception Policies 
     */
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
        throw new DiscardException(" The thread pool has been bursting, and the redundant people will be lost ");
    };
    /**
     *
     */
    public SimpleThreadPool(){
        this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);
    }
    /**
     *
     */
    public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {
        this.min = min;
        this.active = active;
        this.max = max;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        init();
    }
 /**
  *  Initialization 
  */
    private void init() {
        for(int i = 0; i < min; i++){
            createWorkTask();
        }
        this.size = min;
        this.start();
    }
    private void createWorkTask(){
        WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
        task.start();
        THREAD_QUEUE.add(task);
    }
 /**
  *  Automatic thread pool expansion 
  */
    @Override
    public void run() {
        while(!destory){
            System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+  TASK_QUEUE.size());
            try {
                Thread.sleep(1000);
                if(TASK_QUEUE.size() > active && size < active){
                    for (int i = size; i < active;i++){
                        createWorkTask();
                    }
                    size = active;
                }else if(TASK_QUEUE.size() > max && size < max){
                    for (int i = size; i < max;i++){
                        createWorkTask();
                    }
                    size = max;
                }
                synchronized (THREAD_QUEUE){
                    if(TASK_QUEUE.isEmpty() && size > active){
                        int release = size - active;
                        for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){
                            if(release <=0){
                                break;
                            }
                            WorkerTask task = it.next();
                            task.close();
                            task.interrupt();
                            it.remove();
                            release--;
                        }
                        size = active;
                    }
                }
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    public void submit(Runnable runnable){
        synchronized (TASK_QUEUE){
            if(destory){
                throw new DiscardException(" The thread pool has been destroyed ...");
            }
            if(TASK_QUEUE.size() > queueSize){
                discardPolicy.discard();
            }
            TASK_QUEUE.addLast(runnable);
            TASK_QUEUE.notifyAll();
        }
    }
 /**
  *  Shut down 
  */
    public void shutdown(){
        while(!TASK_QUEUE.isEmpty()){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        synchronized (THREAD_QUEUE) {
            int initVal = THREAD_QUEUE.size();
            while (initVal > 0) {
                for (WorkerTask workerTask : THREAD_QUEUE) {
                    if (workerTask.getTaskState() == TaskState.BLOCKED) {
                        workerTask.interrupt();
                        workerTask.close();
                        initVal--;
                    } else {
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
            this.destory = true;
        }
    }
    public int getSize() {
        return size;
    }
    public int getMin() {
        return min;
    }
    public int getMax() {
        return max;
    }
    public int getActive() {
        return active;
    }
    /**
     *  Thread state 
     */
    private enum TaskState{
        FREE , RUNNING , BLOCKED , DEAD
    }
    /**
     *  Custom exception class 
     */
    public static class DiscardException extends RuntimeException{
        public DiscardException(String message){
            super(message);
        }
    }
    /**
     *  Define exception policy 
     */
    @FunctionalInterface
    public interface DiscardPolicy{
        void discard() throws DiscardException;
    }
    private static class WorkerTask extends Thread{
        private volatile TaskState taskState = TaskState.FREE;
        public TaskState getTaskState(){
            return this.taskState;
        }
        public WorkerTask(ThreadGroup group , String name){
            super(group , name);
        }
        @Override
        public void run(){
            OUTER:
            while(this.taskState != TaskState.DEAD){
                Runnable runnable;
                synchronized (TASK_QUEUE){
                    while(TASK_QUEUE.isEmpty()){
                        try {
                            taskState = TaskState.BLOCKED;
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            break OUTER;
                        }
                    }
                    runnable = TASK_QUEUE.removeFirst();
                }
                if(runnable != null){
                    taskState = TaskState.RUNNING;
                    runnable.run();
                    taskState = TaskState.FREE;
                }
            }
        }
        public void close(){
            this.taskState = TaskState.DEAD;
        }
    }
    /**
     *  Test 
     * @param args
     */
    public static void main(String[] args) {
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
//        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);
        IntStream.rangeClosed(0,40).forEach(i -> {
            simpleThreadPool.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());
            });
        });
//        try {
//            Thread.sleep(15000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        simpleThreadPool.shutdown();
    }
}

Related articles: