python Realizes Multithreading Based on concurrent Module
- 2021-10-27 08:08:02
- OfStack
Operate multi-threaded/multi-process
1. Create a thread pool
2. submit
3. map
4. wait
5. Exception handling
Introduction
I also wrote many threaded blogs before, using threading. Today, I will talk about another self-contained library concurrent of python. concurrent was introduced in Python3.2, and it can write thread pool/process pool with only a few lines of code. The efficiency of computational tasks is equal to that of poll and ThreadPoll provided by mutiprocessing. pool, and the efficiency of IO tasks is several times higher due to the introduction of Future concept. However, threading has to maintain relevant queues to prevent deadlocks, and the readability of the code will also decrease. On the contrary, the thread pool provided by concurrent is very convenient, so you don't have to worry about deadlocks and write thread pool codes. Because of the asynchronous concept, IO tasks have more advantages.
concurrent is really easy to use, mainly providing ThreadPoolExecutor and ProcessPoolExecutor. 1 multi-threaded, 1 multi-process. But concurrent is essentially an encapsulation of threading and mutiprocessing. Look at its source code, so the lowest level is not asynchronous.
ThreadPoolExecutor provides its own task queue, so you don't need to write it yourself. The so-called thread pool simply compares the current number of threads with the defined size of max_workers. If it is smaller than max_workers, it allows task creation threads to execute tasks.
Operate multi-threaded/multi-process
1. Create a thread pool
The thread pool object is created through the ThreadPoolExecutor class, and max_workers sets the maximum number of running threads. The advantage of using ThreadPoolExecutor is that you don't have to worry about thread deadlock, which makes multithreaded programming simpler.
from concurrent import futures
pool = futures.ThreadPoolExecutor(max_workers = 2)
2. submit
submit(self, fn, *args, **kwargs):
fn: Functions that require asynchronous execution *args, **kwargs: Parameters accepted by fnThe purpose of this method is to submit an executable callback task, which returns an Future object. It can be seen that this method does not block the execution of the main thread.
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
for url in urls:
task = pool.submit(get_request,url)
print('{} Main thread '.format(datetime.datetime.now()))
time.sleep(2)
# Output result
2021-03-12 15:29:10.780141: Main thread
2021-03-12 15:29:10.865425:https://www.baidu.com 200
2021-03-12 15:29:10.923062:https://www.tmall.com 200
2021-03-12 15:29:10.940930:https://www.jd.com 200
3. map
map(self, fn, *iterables, timeout=None, chunksize=1):
fn: Functions that require asynchronous execution * iterables: Iterable objectThe second parameter of map is an iterative object, such as list, tuple, etc., which is relatively simple to write. The map method also does not block the execution of the main thread.
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks = pool.map(get_request,urls)
print('{}: Main thread '.format(datetime.datetime.now()))
time.sleep(2)
# Output result
2021-03-12 16:14:04.854452: Main thread
2021-03-12 16:14:04.938870:https://www.baidu.com 200
2021-03-12 16:14:05.033849:https://www.jd.com 200
2021-03-12 16:14:05.048952:https://www.tmall.com 200
4. wait
If you want to wait for the child thread to finish executing before executing the main thread, you can use wait.
wait(fs, timeout=None, return_when=ALL_COMPLETED):
fs: All Tasks tasks return_when: Has 3 parameters FIRST_COMPLETED: Returns a result as long as one child thread completes. FIRST_EXCEPTION: If one child thread throws an exception, the result will be returned. If there is no exception, it is equivalent to ALL_COMPLETED. ALL_COMPLETED: Default, waiting for all child threads to complete.
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks =[]
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
futures.wait(tasks)
print('{}: Main thread '.format(datetime.datetime.now()))
time.sleep(2)
# Output result
2021-03-12 16:30:13.437042:https://www.baidu.com 200
2021-03-12 16:30:13.552700:https://www.jd.com 200
2021-03-12 16:30:14.117325:https://www.tmall.com 200
2021-03-12 16:30:14.118284: Main thread
5. Exception handling
as_completed(fs, timeout=None)
All Tasks tasksWhen using concurrent. futures to operate multi-threaded/multi-process, many function errors do not directly terminate the program, but nothing happens. Exceptions can be caught using as_completed with the following code
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['www.baidu.com','https://www.tmall.com','https://www.jd.com']
# Create a thread pool
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks =[]
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
# Exception trapping
errors = futures.as_completed(tasks)
for error in errors:
# error.result() Wait for the child threads to complete and throw an exception to interrupt the main thread
# Catch child thread exception, and the main thread will not be terminated to continue running
print(error.exception())
futures.wait(tasks)
print('{}: Main thread '.format(datetime.datetime.now()))
time.sleep(2)
# Output result
Invalid URL 'www.baidu.com': No schema supplied. Perhaps you meant http://www.baidu.com?
2021-03-12 17:24:26.984933:https://www.tmall.com 200
None
2021-03-12 17:24:26.993939:https://www.jd.com 200
None
2021-03-12 17:24:26.994937: Main thread
Multi-process programming is similar, replacing ThreadPoolExecutor with ProcessPoolExecutor.
The above is python based on concurrent module to achieve multithreading details, more about python concurrent to achieve multithreading information please pay attention to other related articles on this site!