Method of Tomcat Using Thread Pool to Handle Remote Concurrent Requests

  • 2021-09-25 00:11:26
  • OfStack

Learn how tomcat handles concurrent requests, and learn about thread pools, locks, queues, unsafe classes. The following main code comes from

java-jre:


sun.misc.Unsafe
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.locks.AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedLongSynchronizer
java.util.concurrent.LinkedBlockingQueue

tomcat:


org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue

ThreadPoolExecutor

Is a thread pool implementation class, which manages threads and reduces thread overhead, and can be used to improve task execution efficiency.

The parameters in the constructor are


public ThreadPoolExecutor(
 int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler) {
 
}

corePoolSize is the number of core threads
maximumPoolSize is the maximum number of threads
keepAliveTime Non-Core Thread Maximum Idle Time (over Time Termination)
unit Time Unit
workQueue queue, when there are too many tasks, it is stored in the queue first
threadFactory Thread Factory, a factory for creating threads
handler refuses the strategy, when the number of tasks is too much, the queue can no longer store tasks, how to deal with it, so the object can deal with it. This is an interface, and you can customize the processing method

Application of ThreadPoolExecutor to http Request in Tomcat

This thread pool is used by tomcat to process each request as a single task after receiving the remote request, and call execute every time (Runnable)

Initialization

org.apache.tomcat.util.net.NioEndpoint

When NioEndpoint is initialized, a thread pool is created


public void createExecutor() {
 internalExecutor = true;
 TaskQueue taskqueue = new TaskQueue();
 //TaskQueue Unbounded queues, which can 1 Directly added, so handler  Equivalent to invalid 
 TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
 taskqueue.setParent( (ThreadPoolExecutor) executor);
 }

When the thread pool is created, call prestartAllCoreThreads (), initialize the core worker thread worker, and start


public int prestartAllCoreThreads() {
 int n = 0;
 while (addWorker(null, true))
  ++n;
 return n;
 }

When the number of addWorker is equal to corePoolSize, addWorker (null, ture) returns false, stopping the creation of worker worker threads

Submit a task to a queue

Every time the client comes to request (http), the processing task will be submitted once.

worker gets the task from the queue to run, and the following is the logical code for putting the task into the queue

ThreadPoolExecutor. execute (Runnable) Submit task:


public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
 
 int c = ctl.get();
 	// worker Number   Is less than   Number of core threads  tomcat After initialization in, 1 General dissatisfaction 1 A condition, no addWorker
 if (workerCountOf(c) < corePoolSize) {
  if (addWorker(command, true))
  return;
  c = ctl.get();
 }
 	// workQueue.offer(command) Add tasks to the queue, 
 if (isRunning(c) && workQueue.offer(command)) {
  int recheck = ctl.get();
  if (! isRunning(recheck) && remove(command))
  reject(command);
  else if (workerCountOf(recheck) == 0)
  addWorker(null, false);
 }
 else if (!addWorker(command, false))
  reject(command);
 }

workQueue. offer (command) completes the task submission (while tomcat processes remote http requests).

workQueue.offer

TaskQueue is the BlockingQueue concrete implementation class, workQueue. offer (command) actual code:


public boolean offer(E e) {
 if (e == null) throw new NullPointerException();
 final AtomicInteger count = this.count;
 if (count.get() == capacity)
 return false;
 int c = -1;
 Node<E> node = new Node<E>(e);
 final ReentrantLock putLock = this.putLock;
 putLock.lock();
 try {
 if (count.get() < capacity) {
  enqueue(node); // Add tasks to the queue here 
  c = count.getAndIncrement();
  if (c + 1 < capacity)
  notFull.signal();
 }
 } finally {
 putLock.unlock();
 }
 if (c == 0)
 signalNotEmpty();
 return c >= 0;
}

//  Add a task to a queue 
/**
 * Links node at end of queue.
 *
 * @param node the node
 */
private void enqueue(Node<E> node) {
 // assert putLock.isHeldByCurrentThread();
 // assert last.next == null;
 last = last.next = node; // Linked list structure  last.next = node; last = node
}

This is followed by the work of worker, which in the run method obtains the task submitted here by going to getTask () and executes the completed task.

How does the thread pool handle newly submitted tasks

After worker is added, the task is submitted. Because the number of worker reaches corePoolSize, the task will be put into the queue, while the run method of worker loops to obtain the task in the queue (when it is not empty).

worker run Method:


/** Delegates main run loop to outer runWorker */
 public void run() {
  runWorker(this);
 }

Loop to get tasks in the queue

runWorker (worker) method loop part code:


final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 Runnable task = w.firstTask;
 w.firstTask = null;
 w.unlock(); // allow interrupts
 boolean completedAbruptly = true;
 try {
  while (task != null || (task = getTask()) != null) { // Loop to get tasks in the queue 
  w.lock(); //  Lock 
  try {
   //  Pre-run processing 
   beforeExecute(wt, task);
   //  The task in the queue begins to execute 
   task.run();
   //  Run post-processing 
   afterExecute(task, thrown);
  } finally {
   task = null;
   w.completedTasks++;
   w.unlock(); //  Release lock 
  }
  }
  completedAbruptly = false;
 } finally {
  processWorkerExit(w, completedAbruptly);
 }
 }

task. run () performs tasks

Lock application

ThreadPoolExecutor uses locks to guarantee two main things,
1. Add tasks to the queue to ensure that other threads can't operate the queue
2. Get the tasks of the queue to ensure that other threads can't operate the queue at the same time

Add a task lock to the queue


public boolean offer(E e) {
 if (e == null) throw new NullPointerException();
 final AtomicInteger count = this.count;
 if (count.get() == capacity)
  return false;
 int c = -1;
 Node<E> node = new Node<E>(e);
 final ReentrantLock putLock = this.putLock;
 putLock.lock(); // Lock 
 try {
  if (count.get() < capacity) {
  enqueue(node);
  c = count.getAndIncrement();
  if (c + 1 < capacity)
   notFull.signal();
  }
 } finally {
  putLock.unlock(); // Release lock 
 }
 if (c == 0)
  signalNotEmpty();
 return c >= 0;
 }

Get queue task lock


org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
0

volatile

It is common for this keyword to decorate member variables in concurrent scenarios,

The main purpose public variable is visible to other threads when it is modified by one thread (real-time)

sun. misc. Unsafe High Concurrency Related Classes

Thread pool use, there are ordinary use of Unsafe class, this class in high concurrency, can do some atomic CAS operations, lock threads, release threads and so on.

sun.misc.Unsafe Class is the underlying class, openjdk source code has

Atomic operation data

The code that guarantees atomic operations is found in the java. util. concurrent. locks. AbstractQueuedSynchronizer classes


org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
1

The code corresponding to the Unsafe class:


org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue
2

The function of the method is simply to update a value and ensure atomicity operation
When you want to manipulate 1 object o 1 member variable of offset Modify o. offset,
In order to ensure accuracy under high concurrency, when you operate o. offset, the read should be the correct value, and it cannot be modified by other threads to ensure the high concurrency environment data operation is effective.

That is to say, if the expected value of expected is compared with the value in memory, expected = = the value in memory, the updated value is x, and returning true means that the modification is successful

Otherwise, the expected value is different from the memory value, indicating that the value has been modified by other threads, and the value cannot be updated to x, and false is returned to tell the operator that the atomicity modification failed.

Blocking and Waking Threads

public native void park (boolean isAbsolute, long time); //Block the current thread

Thread pool's worker role obtains queue task cyclically. If there is no task in the queue, worker. run is still waiting and will not exit the thread. It is used in the code notEmpty.await() Interrupt this worker thread and put it into a waiting thread queue (distinguish the task queue); When a new task is needed, notEmpty.signal() Wake up this thread

The bottom layers are
unsafe. park () blocks the current thread
public native void park(boolean isAbsolute, long time);

unsafe. unpark () Wake Thread
public native void unpark(Object thread);

This operation is corresponding. When blocking, put thread into the queue first. When waking up, take out the blocked thread from the queue, and unsafe. unpark (thread) wakes up the specified thread.

java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject Class

Store thread information through linked list


//  Add 1 Blocking threads 
private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
  unlinkCancelledWaiters();
  t = lastWaiter;
  }
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  if (t == null)
  firstWaiter = node;
  else
  t.nextWaiter = node;
  lastWaiter = node; // Put the newly blocked thread at the end of the linked list 
  return node;
 }

//  Take out 1 Blocked threads 
 public final void signal() {
  if (!isHeldExclusively())
  throw new IllegalMonitorStateException();
  Node first = firstWaiter; // In the linked list 1 Blocked threads 
  if (first != null)
  doSignal(first);
 }

//  After getting it, wake up this thread 
final boolean transferForSignal(Node node) {
  LockSupport.unpark(node.thread);
 return true;
 }
public static void unpark(Thread thread) {
 if (thread != null)
  UNSAFE.unpark(thread);
 }

Related articles: