Proper Use of Python Thread Pool

  • 2021-11-29 07:48:30
  • OfStack

Directory Python Proper Use of Thread Pool 1, why use thread pool? 2. How to use the thread pool? 3. How to get the results of thread execution without blocking 4. Running strategy of thread pool

Proper use of Python thread pool

1. Why use thread pools?

Because the thread will be destroyed by the system after executing the task, it will be created the next time the task is executed. There is nothing logically wrong with this approach. However, the cost of starting a new thread is relatively high, because it involves interaction with the operating system, and the operating system needs to allocate resources to the new thread. Let's make an analogy! It's like a software company hiring employees to work. When there is a job to do, recruit an outsourced person to work. Dismiss the person when the work is finished. Do you think the time cost and communication cost consumed in this process are very large? Then the general practice of company 1 is to determine how many developers are needed when the project is established, and then complete these people. Then these people are resident in the project team, doing jobs when they have jobs, and fishing when they don't have jobs. The same is true for thread pools. The thread pool can define the maximum number of threads, which execute tasks when they have tasks, and rest in the thread pool when they have no tasks.

2. How to use the thread pool?

The base class of the thread pool is the Executor class in the concurrent. futures module, and Executor Class provides two subclasses, namely ThreadPoolExecutor Class and ProcessPoolExecutor Class. Among them ThreadPoolExecutor Used for 创建线程池 , and ProcessPoolExecutor Used for 创建进程池 . This article focuses on the use of the ThreadPoolExecutor class. First, let's look at the constructor of the ThreadPoolExecutor class. The Python version used here is 3.6. 7.


      def __init__(self, max_workers=None, thread_name_prefix=''):
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()
        self._threads = set()
        self._shutdown = False
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))

His constructor has only two arguments: 1 is max_workers Parameter that specifies the maximum number of threads in the thread pool, which defaults to 5 times the number of CPU cores if not specified. The other parameter is thread_name_prefix That specifies the name prefix of the thread in the thread pool. Other parameters:

_shutdown The initial value is False, and the thread pool is not destroyed by default, that is, the life cycle of the thread pool is the same as the life cycle of the project. ThreadPoolExecutor0 () Generates a buffer queue. _threads When no task is submitted, the number of threads is set to 0. _shutdown_lock Specifies that the thread pool lock is an Lock lock. Having said the creation of thread pool, let's take a look at several commonly used methods in thread pool. submit(self, fn, *args, **kwargs):
The method uses a commit task, that is, the fn function is submitted to the thread pool, * args represents the parameters passed to the fn function, **kwargs represents the parameters passed to the fn function in the form of keyword parameters. shutdown(self, wait=True): Close the thread pool map(func, *iterables, timeout=None, chunksize=1): This function is similar to a global function map(func,*iterables ), but this function will start multiple threads to perform map processing on iterables asynchronously and immediately.

After the program submits the task function to the thread pool through the submit method, the thread pool will return an Future object, which is mainly used to obtain the return value of the thread task function. Future provides the following methods.

cancel(): Cancel the thread task represented by the Future. If the task is being executed and cannot be canceled, the method returns False;; Otherwise, the program cancels the task and returns True. result(timeout=None): Gets the last result returned by the threaded task represented by this Future. If the thread task represented by Future has not completed, this method blocks the current thread, where the timeout parameter specifies the maximum number of seconds to block. add_done_callback(fn): Register a "callback function" for the thread task represented by the Future. When the task is successfully completed, the program will automatically trigger the fn function. done(): The method returns True if the thread task represented by the Future is successfully cancelled or executed.

Let's take a simple example:

In this example, 1 thread pool with a maximum number of threads of 2 is created to execute the async_add function.


from concurrent.futures import ThreadPoolExecutor
import threading
import time


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + " The sum obtained by performing the sum operation is =" + str(sum))
    return sum

#  Create two threads 
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix=' Test thread ')
#  Submit to the thread pool 1 A task,20 As async_add() Parameters of the function 
future1 = pool.submit(async_add, 20)
#  Re-submit to the thread pool 1 A task
future2 = pool.submit(async_add, 50)
#  Judge future1 Is the delegate's task completed 
time.sleep(2)
print(future1.done())
print(future2.done())
#  View future1 The result returned by the task represented by 
print(' Thread 1 The execution result of is =' + str(future1.result()))
#  View future2 The return result of the task represented by 
print(' Thread 2 The execution result of is =' + str(future2.result()))
print("----" + threading.current_thread().name + "---- End of execution of the main thread -----")

The result of the operation is:

The sum obtained by the test thread _ 0 performing the sum operation is = 190
The sum obtained by the test thread _ 1 performing the sum operation is = 1225
True
True
The result of thread 1 is = 190
The result of thread 2 is = 1225
--MainThread--End of Main Thread Execution--

In this example, a thread pool with a maximum number of threads of 2 is defined, and two tasks are submitted to the thread pool, in which the async_add function is the task to be executed. In async_add Function to add time.sleep(1) Hibernate for 1 second to verify the results returned by the done () method. Printing the end of the main thread execution at the end indicates that the result () method is blocked. If result () is masked.
Change it to the following form:


#  Create two threads 
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix=' Test thread ')
#  Submit to the thread pool 1 A task,20 As async_add() Parameters of the function 
future1 = pool.submit(async_add, 20)
#  Re-submit to the thread pool 1 A task
future2 = pool.submit(async_add, 50)
#  Judge future1 Is the delegate's task completed 
print(future1.done())
print(future2.done())
print("----" + threading.current_thread().name + "---- End of execution of the main thread -----")

The result of the run is:

False
False
--ES 115EN--End of Main Thread Execution--
The sum obtained by the test thread _ 0 performing the sum operation is = 190
The sum obtained by the test thread _ 1 performing the sum operation is = 1225

3. How to get the result of thread execution without blocking

The result () method described earlier obtains the running results of threads by blocking. What if you get the final return result of the thread task through a non-blocking method? Here, you need to use the callback function of the thread to get the return result of the thread.


from concurrent.futures import ThreadPoolExecutor
import threading
import time


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + " The sum obtained by performing the sum operation is =" + str(sum))
    return sum


with ThreadPoolExecutor(max_workers=2) as pool:
    #  Submit to the thread pool 1 A task
    future1 = pool.submit(async_add, 20)
    future2 = pool.submit(async_add, 50)


    #  Define the function to get the result 
    def get_result(future):
        print(threading.current_thread().name + ' Run results: ' + str(future.result()))


    #  View future1 The result returned by the task represented by 
    future1.add_done_callback(get_result)
    #  View future2 The return result of the task represented by 
    future2.add_done_callback(get_result)
    print('------------ End of execution of the main thread ----')

The result of the operation is:

------End of execution of the main thread---
ThreadPoolExecutor-0_1 performs the sum operation and the sum is = 1225
ThreadPoolExecutor-0_1 Run result: 1225
ThreadPoolExecutor-0_0 performs a sum operation to get a sum of = 190
ThreadPoolExecutor-0_0 Run result: 190

It can be seen from the results that the method to get the results of thread execution did not block the operation of the main thread at all. Through here add_done_callback Function registers a function get_result to get the result of thread execution in the thread pool.
Because the thread pool implements the context management protocol (Context Manage Protocol), the program can use the with statement to manage the thread pool, which avoids manually closing the thread pool.

4. Running strategy of thread pool

It is necessary to introduce the execution strategy of 1 thread pool, that is to say, when the number of tasks in the thread pool is greater than the maximum number of threads in the thread pool, how should the thread pool handle these tasks? Is the task that can't be handled discarded directly or handled slowly? Before answering this question again, let's take a look at the following example: here, we define a maximum number of threads as 4 thread pools, and then submit 100 task tasks to the thread pool.


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + " The sum obtained by performing the sum operation is =" + str(sum))
    return sum


with ThreadPoolExecutor(max_workers=4) as pool:
    for i in range(100):
        pool.submit(async_add, i)
    print('------------ End of execution of the main thread ----')

The result of the operation is:

------End of execution of the main thread---
ThreadPoolExecutor-0_1 performs the sum operation and the sum is = 0
ThreadPoolExecutor-0_0 The sum obtained by performing the sum operation is = 0
ThreadPoolExecutor-0_3 The sum obtained by performing the sum operation is = 3
ThreadPoolExecutor-0_2 performs the sum operation and the sum is = 1
... omit part of the result...
ThreadPoolExecutor-0_1 performs the sum operation and the sum is = 4656
The sum obtained by ThreadPoolExecutor-0_2 is = 4753
ThreadPoolExecutor-0_0 performs the sum operation and the sum is = 4560
The sum obtained by ThreadPoolExecutor-0_3 is = 4851

As you can see from the running results: 1 is the same thread to perform these tasks, and all the tasks have not been discarded. And the tasks are performed on a first-come-first-served basis. Here we need to talk about the default buffer queue of thread pool. self._work_queue = queue.Queue() This statement creates a buffer queue of unlimited size. This queue is an FIFO (first in, first out) regular queue. Therefore, when the number of tasks exceeds the maximum number of threads, the tasks will be temporarily placed in the buffer queue queue. When the thread is idle, the task will be taken out of the buffer queue for execution.
The queue has a parameter maxsize that limits the size of the queue. If the queue reaches the maximum size of the queue, it is locked, and when elements are added again, it is blocked until the elements in the queue are consumed. If maxsize is set to 0 or negative, the size of the queue is unlimited.


Related articles: