Method of Tomcat Using Thread Pool to Handle Remote Concurrent Requests
- 2021-09-25 00:11:26
- OfStack
Learn how tomcat handles concurrent requests, and learn about thread pools, locks, queues, unsafe classes. The following main code comes from
java-jre:
sun.misc.Unsafe
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.locks.AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedLongSynchronizer
java.util.concurrent.LinkedBlockingQueue
tomcat:
org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
ThreadPoolExecutor
Is a thread pool implementation class, which manages threads and reduces thread overhead, and can be used to improve task execution efficiency.
The parameters in the constructor are
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
corePoolSize is the number of core threads
maximumPoolSize is the maximum number of threads
keepAliveTime Non-Core Thread Maximum Idle Time (over Time Termination)
unit Time Unit
workQueue queue, when there are too many tasks, it is stored in the queue first
threadFactory Thread Factory, a factory for creating threads
handler refuses the strategy, when the number of tasks is too much, the queue can no longer store tasks, how to deal with it, so the object can deal with it. This is an interface, and you can customize the processing method
Application of ThreadPoolExecutor to http Request in Tomcat
This thread pool is used by tomcat to process each request as a single task after receiving the remote request, and call execute every time (Runnable)
Initialization
org.apache.tomcat.util.net.NioEndpoint
When NioEndpoint is initialized, a thread pool is created
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
//TaskQueue Unbounded queues, which can 1 Directly added, so handler Equivalent to invalid
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
When the thread pool is created, call prestartAllCoreThreads (), initialize the core worker thread worker, and start
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
When the number of addWorker is equal to corePoolSize, addWorker (null, ture) returns false, stopping the creation of worker worker threads
Submit a task to a queue
Every time the client comes to request (http), the processing task will be submitted once.
worker gets the task from the queue to run, and the following is the logical code for putting the task into the queue
ThreadPoolExecutor. execute (Runnable) Submit task:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// worker Number Is less than Number of core threads tomcat After initialization in, 1 General dissatisfaction 1 A condition, no addWorker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// workQueue.offer(command) Add tasks to the queue,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
workQueue. offer (command) completes the task submission (while tomcat processes remote http requests).
workQueue.offer
TaskQueue is the BlockingQueue concrete implementation class, workQueue. offer (command) actual code:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node); // Add tasks to the queue here
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
// Add a task to a queue
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node; // Linked list structure last.next = node; last = node
}
This is followed by the work of worker, which in the run method obtains the task submitted here by going to getTask () and executes the completed task.
How does the thread pool handle newly submitted tasks
After worker is added, the task is submitted. Because the number of worker reaches corePoolSize, the task will be put into the queue, while the run method of worker loops to obtain the task in the queue (when it is not empty).
worker run Method:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
Loop to get tasks in the queue
runWorker (worker) method loop part code:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // Loop to get tasks in the queue
w.lock(); // Lock
try {
// Pre-run processing
beforeExecute(wt, task);
// The task in the queue begins to execute
task.run();
// Run post-processing
afterExecute(task, thrown);
} finally {
task = null;
w.completedTasks++;
w.unlock(); // Release lock
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
task. run () performs tasks
Lock application
ThreadPoolExecutor uses locks to guarantee two main things,
1. Add tasks to the queue to ensure that other threads can't operate the queue
2. Get the tasks of the queue to ensure that other threads can't operate the queue at the same time
Add a task lock to the queue
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Lock
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock(); // Release lock
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
Get queue task lock
org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
0
volatile
It is common for this keyword to decorate member variables in concurrent scenarios,
The main purpose public variable is visible to other threads when it is modified by one thread (real-time)
sun. misc. Unsafe High Concurrency Related Classes
Thread pool use, there are ordinary use of Unsafe class, this class in high concurrency, can do some atomic CAS operations, lock threads, release threads and so on.
sun.misc.Unsafe
Class is the underlying class, openjdk source code has
Atomic operation data
The code that guarantees atomic operations is found in the java. util. concurrent. locks. AbstractQueuedSynchronizer classes
org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
1
The code corresponding to the Unsafe class:
org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
2
The function of the method is simply to update a value and ensure atomicity operation
When you want to manipulate 1 object
o
1 member variable of
offset
Modify o. offset,
In order to ensure accuracy under high concurrency, when you operate o. offset, the read should be the correct value, and it cannot be modified by other threads to ensure the high concurrency environment data operation is effective.
That is to say, if the expected value of expected is compared with the value in memory, expected = = the value in memory, the updated value is x, and returning true means that the modification is successful
Otherwise, the expected value is different from the memory value, indicating that the value has been modified by other threads, and the value cannot be updated to x, and false is returned to tell the operator that the atomicity modification failed.
Blocking and Waking Threads
public native void park (boolean isAbsolute, long time); //Block the current thread
Thread pool's worker role obtains queue task cyclically. If there is no task in the queue, worker. run is still waiting and will not exit the thread. It is used in the code
notEmpty.await()
Interrupt this worker thread and put it into a waiting thread queue (distinguish the task queue); When a new task is needed,
notEmpty.signal()
Wake up this thread
The bottom layers are
unsafe. park () blocks the current thread
public native void park(boolean isAbsolute, long time);
unsafe. unpark () Wake Thread
public native void unpark(Object thread);
This operation is corresponding. When blocking, put thread into the queue first. When waking up, take out the blocked thread from the queue, and unsafe. unpark (thread) wakes up the specified thread.
java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject
Class
Store thread information through linked list
// Add 1 Blocking threads
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node; // Put the newly blocked thread at the end of the linked list
return node;
}
// Take out 1 Blocked threads
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // In the linked list 1 Blocked threads
if (first != null)
doSignal(first);
}
// After getting it, wake up this thread
final boolean transferForSignal(Node node) {
LockSupport.unpark(node.thread);
return true;
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}