python Realizes Multithreading Based on concurrent Module

  • 2021-10-27 08:08:02
  • OfStack

Catalogue introduction
Operate multi-threaded/multi-process
1. Create a thread pool
2. submit
3. map
4. wait
5. Exception handling

Introduction

I also wrote many threaded blogs before, using threading. Today, I will talk about another self-contained library concurrent of python. concurrent was introduced in Python3.2, and it can write thread pool/process pool with only a few lines of code. The efficiency of computational tasks is equal to that of poll and ThreadPoll provided by mutiprocessing. pool, and the efficiency of IO tasks is several times higher due to the introduction of Future concept. However, threading has to maintain relevant queues to prevent deadlocks, and the readability of the code will also decrease. On the contrary, the thread pool provided by concurrent is very convenient, so you don't have to worry about deadlocks and write thread pool codes. Because of the asynchronous concept, IO tasks have more advantages.

concurrent is really easy to use, mainly providing ThreadPoolExecutor and ProcessPoolExecutor. 1 multi-threaded, 1 multi-process. But concurrent is essentially an encapsulation of threading and mutiprocessing. Look at its source code, so the lowest level is not asynchronous.
ThreadPoolExecutor provides its own task queue, so you don't need to write it yourself. The so-called thread pool simply compares the current number of threads with the defined size of max_workers. If it is smaller than max_workers, it allows task creation threads to execute tasks.

Operate multi-threaded/multi-process

1. Create a thread pool

The thread pool object is created through the ThreadPoolExecutor class, and max_workers sets the maximum number of running threads. The advantage of using ThreadPoolExecutor is that you don't have to worry about thread deadlock, which makes multithreaded programming simpler.


from concurrent import futures

pool = futures.ThreadPoolExecutor(max_workers = 2)

2. submit

submit(self, fn, *args, **kwargs):

fn: Functions that require asynchronous execution *args, **kwargs: Parameters accepted by fn

The purpose of this method is to submit an executable callback task, which returns an Future object. It can be seen that this method does not block the execution of the main thread.


import requests,datetime,time
from concurrent import futures

def get_request(url):
    r = requests.get(url)
    print('{}:{}  {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
for url in urls:
    task = pool.submit(get_request,url)
print('{} Main thread '.format(datetime.datetime.now()))
time.sleep(2)


#  Output result 
2021-03-12 15:29:10.780141: Main thread 
2021-03-12 15:29:10.865425:https://www.baidu.com  200
2021-03-12 15:29:10.923062:https://www.tmall.com  200
2021-03-12 15:29:10.940930:https://www.jd.com  200

3. map

map(self, fn, *iterables, timeout=None, chunksize=1):

fn: Functions that require asynchronous execution * iterables: Iterable object

The second parameter of map is an iterative object, such as list, tuple, etc., which is relatively simple to write. The map method also does not block the execution of the main thread.


import requests,datetime,time
from concurrent import futures



def get_request(url):
    r = requests.get(url)
    print('{}:{}  {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks = pool.map(get_request,urls)
print('{}: Main thread '.format(datetime.datetime.now()))
time.sleep(2)


#  Output result 
2021-03-12 16:14:04.854452: Main thread 
2021-03-12 16:14:04.938870:https://www.baidu.com  200
2021-03-12 16:14:05.033849:https://www.jd.com  200
2021-03-12 16:14:05.048952:https://www.tmall.com  200

4. wait

If you want to wait for the child thread to finish executing before executing the main thread, you can use wait.

wait(fs, timeout=None, return_when=ALL_COMPLETED):

fs: All Tasks tasks return_when: Has 3 parameters FIRST_COMPLETED: Returns a result as long as one child thread completes. FIRST_EXCEPTION: If one child thread throws an exception, the result will be returned. If there is no exception, it is equivalent to ALL_COMPLETED. ALL_COMPLETED: Default, waiting for all child threads to complete.

import requests,datetime,time
from concurrent import futures


def get_request(url):
    r = requests.get(url)
    print('{}:{}  {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks =[]
for url in urls:
    task = pool.submit(get_request,url)
    tasks.append(task)
futures.wait(tasks)
print('{}: Main thread '.format(datetime.datetime.now()))
time.sleep(2)


#  Output result 
2021-03-12 16:30:13.437042:https://www.baidu.com  200
2021-03-12 16:30:13.552700:https://www.jd.com  200
2021-03-12 16:30:14.117325:https://www.tmall.com  200
2021-03-12 16:30:14.118284: Main thread 

5. Exception handling

as_completed(fs, timeout=None)

All Tasks tasks

When using concurrent. futures to operate multi-threaded/multi-process, many function errors do not directly terminate the program, but nothing happens. Exceptions can be caught using as_completed with the following code


import requests,datetime,time
from concurrent import futures

def get_request(url):
    r = requests.get(url)
    print('{}:{}  {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['www.baidu.com','https://www.tmall.com','https://www.jd.com']
#  Create a thread pool 
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks =[]
for url in urls:
    task = pool.submit(get_request,url)
    tasks.append(task)
#  Exception trapping  
errors = futures.as_completed(tasks)
for error in errors:
    # error.result()        Wait for the child threads to complete and throw an exception to interrupt the main thread 
    #  Catch child thread exception, and the main thread will not be terminated to continue running 
    print(error.exception())
futures.wait(tasks)
print('{}: Main thread '.format(datetime.datetime.now()))
time.sleep(2)


#  Output result 
Invalid URL 'www.baidu.com': No schema supplied. Perhaps you meant http://www.baidu.com?
2021-03-12 17:24:26.984933:https://www.tmall.com  200
None
2021-03-12 17:24:26.993939:https://www.jd.com  200
None
2021-03-12 17:24:26.994937: Main thread 

Multi-process programming is similar, replacing ThreadPoolExecutor with ProcessPoolExecutor.

The above is python based on concurrent module to achieve multithreading details, more about python concurrent to achieve multithreading information please pay attention to other related articles on this site!


Related articles: