Support for producing blocked Java thread pools

  • 2020-04-01 03:17:20
  • OfStack

In general, the speed of production is faster than the speed of consumption. One detail is the length of the queue and how to match the speed of production and consumption.

A typical producer-consumer model is as follows:

< img border = 0 id = theimg onclick = window. The open this. (SRC) SRC = "/ / files.jb51.net/file_images/article/201404/201441791241994.png? 201431791258 ">  
Using the Queue implementation provided by J.U.C in a concurrent environment can easily ensure thread safety during production and consumption. The important thing to note here is that the Queue must set the initial capacity to prevent the producer from producing too quickly and causing the Queue length to explode, eventually triggering OutOfMemory.

For the general case where production is faster than consumption. When the queue is full, we don't want any tasks to be ignored or not executed, so the producer can wait a while before submitting the task. Better yet, the producer can be blocked on the method of submitting the task and continue to submit the task when the queue is full, so there is no wasted idle time. Blocking is also easy. BlockingQueue is built for this purpose. Both ArrayBlockingQueue and LinkedBlockingQueue can be constructed with capacity limits.

Further, when the queue is empty, the consumer can not get the task, can wait for a while to get it, better practice is to use the take method of BlockingQueue, block wait, when there is a task can be immediately executed, it is recommended to call the take of the overloaded method with timeout parameter, after the timeout thread exit. In this way, when the producer has actually stopped production, the consumer will not have to wait indefinitely.

An efficient production-consumption model that supports blocking is then implemented.

Wait a minute, since J.U.C has implemented the thread pool for us, why adopt this set of things? Wouldn't it be more convenient to just ExecutorService?

Let's take a look at the basic structure of the ThreadPoolExecutor:

< img border = 0 id = theimg onclick = window. The open this. (SRC) SRC = "/ / files.jb51.net/file_images/article/201404/201441791313782.png? 201431791322 ">  
As you can see, in the ThreadPoolExecutor, the BlockingQueue and Consumer sections have been implemented for us, and there are a number of advantages to adopting a threadpool implementation directly, such as dynamic tuning of the number of threads.

The problem is that even if you manually specify a BlockingQueue implementation when constructing the ThreadPoolExecutor, the fact that the execute method does not block when the queue is full is because ThreadPoolExecutor calls the non-blocking offer method of BlockingQueue:


public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

There is something that needs to be done to achieve a result: when a producer submits a task and the queue is full, the producer is able to block and wait for the task to be consumed.

The point is that under the concurrent environment, queue full not to judge by the producer cannot call ThreadPoolExecutor. GetQueue (). The size () to determine whether a queue is full.

In the implementation of the thread pool, the RejectedExecutionHandler passed in when the construct is invoked when the queue is full rejects the processing of the task. The default implementation is AbortPolicy, throw a RejectedExecutionException directly.

A few rejection strategies are not described here, but the one that is close to our requirements is CallerRunsPolicy, which will make the thread that submitted the task execute the task when the queue is full, which is equivalent to making the producer do the work of the consumer temporarily, so that the producer is not blocked, but the submitted task will be suspended.


public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a <tt>CallerRunsPolicy</tt>.
     */
    public CallerRunsPolicy() { }

    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

However, there are hidden dangers in this strategy. When there are few producers, consumers may have consumed all the tasks in the time of producer consumption, and the queue is in an empty state. When the producer finishes the task, they can continue to produce tasks, which may lead to the hunger of consumer thread in this process.

Following a similar line of thought, the simplest way to do this is to define a RejectedExecutionHandler that will instead call blockingqueue.put to block producers when the queue is full:


new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                        try {
                                executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                                // should not be interrupted
                        }
                }
        }
};

This way, we don't have to worry about the Queue and Consumer logic, we can just focus on the implementation logic of the producer and Consumer threads and just submit tasks to the thread pool.

This approach is much less code than the original design and avoids many of the problems of a concurrent environment. Of course, you can use other methods, such as using semaphores to restrict entry at commit time, but it's more complicated if you're just trying to get producers to block.


Related articles: