Concurrent Programming of python Foundation of II

  • 2021-12-11 18:19:00
  • OfStack

Directory 1. Multi-process Implementation Method 1 Method 2: 2. Advantages and disadvantages of using processes 1, Advantages 2, Disadvantages 3. Process communication 1, Queue to achieve inter-process communication 2, Pipe to achieve inter-process communication (1 to send send (obj), 1 to receive (obj)) 4. Manager Manager 5. Process pool summary

1. Multi-process implementation

Method 1


#  Method packaging     Multi-process 
from multiprocessing import Process
from time import sleep
def func1(arg):
    print(f'{arg} Begin ...')
    sleep(2)
    print(f'{arg} End ...')
if __name__ == "__main__":
    p1 = Process(target=func1,args=('p1',))
    p2 = Process(target=func1,args=('p2',))
    p1.start()
    p2.start()

Method 2:

2. Pros and cons of using a process

1. Advantages

Multi-core computer can be used to execute tasks concurrently and improve execution efficiency Running is not affected by other processes, and it is easy to create Spatial independence, data security

2. Disadvantages

Process creation and deletion consume more system resources

3. Communication of processes

Python provides a variety of mechanisms for inter-process communication, mainly the following two:

1. Python multiprocessing The Queue class under the module provides many methods to realize communication between multiple processes

2. Pipe, also known as "pipeline", is often used to communicate between two processes, which are located at two ends of the pipeline

Pipe literally means "pipe" or "pipe". This way of implementing multi-process programming is very similar to the pipe (pipe) in real life. Usually, a pipeline has two ports, and Pipe is also used to communicate between two processes, which are located at two ends of the pipeline, one for sending data and the other for receiving data- send(obj)

Send an obj to the other end of the pipeline, which receives it using the recv () method. It should be noted that the obj must be serializable, and if the object exceeds 32MB after serialization, an ValueError exception is likely to be thrown- recv()

Receive the data sent by the other end through send () method- close()

Close the connection- poll([timeout])

Returns whether there is still data in the connection that can be read- end_bytes(buffer[, offset[, size]])

Send byte data. If offset and size parameters are not specified, all data of buffer byte string is sent by default; If the offset and size parameters are specified, only size bytes of data starting from offset in the buffer byte string are sent. Data sent by this method should be received using the recv_bytes () or recv_bytes_into methods- recv_bytes([maxlength])

Receives data sent through the send_bytes () method, where maxlength specifies the maximum number of bytes to receive. This method returns the received byte data- recv_bytes_into(buffer[, offset])

The functionality is similar to the recv_bytes () method, except that the received data is placed in buffer

1. Queue realizes inter-process communication


from multiprocessing import Process,current_process,Queue   # current_process  Refers to the current process 
# from queue import Queue
import os
def func(name,mq):
    print(' Process ID {}  Gets the data: {}'.format(os.getpid(),mq.get()))
    mq.put('shiyi')
if __name__ == "__main__":
    # print(' Process ID:{}'.format(current_process().pid))
    # print(' Process ID:{}'.format(os.getpid()))
    mq = Queue()
    mq.put('yangyang')
    p1 = Process(target=func,args=('p1',mq))
    p1.start()
    p1.join()
    print(mq.get())

2. Pipe realizes inter-process communication (one side sends send (obj) and one side receives (obj))


from multiprocessing import Process,current_process,Pipe
import os
def func(name,con):
    print(' Process ID {}  Gets the data: {}'.format(os.getpid(),con.recv()))
    con.send(' Hello! ')
if __name__ == "__main__":
    # print(' Process ID:{}'.format(current_process().pid))
    con1,con2 = Pipe()
    p1 = Process(target=func,args=('p1',con1))
    p1.start()
    con2.send("hello!")
    p1.join()
    print(con2.recv())

4. Manager Manager

Manager provides a way to create shared data so that it can be shared among different processes


from multiprocessing import Process,current_process
import os
from multiprocessing import Manager
def func(name,m_list,m_dict):
    print(' Child process ID {}  Gets the data: {}'.format(os.getpid(),m_list))
    print(' Child process ID {}  Gets the data: {}'.format(os.getpid(),m_dict))
    m_list.append(' How do you do ')
    m_dict['name'] = 'shiyi'    
if __name__ == "__main__":
    print(' Main process ID:{}'.format(current_process().pid))
    with Manager() as mgr:
        m_list = mgr.list()
        m_dict = mgr.dict()
        m_list.append('Hello!!')
        p1 = Process(target=func,args=('p1',m_list,m_dict))
        p1.start()
        p1.join()
        print(m_list)
        print(m_dict)

5. Process pool

Python provides a better way to manage multiple processes by using process pools.

The process pool can provide a specified number of processes for users to use, that is, when a new request is submitted to the process pool, if the pool is not full, a new process will be created to execute the request; On the contrary, if the number of processes in the pool has reached the specified maximum, the request will wait, and the request can be executed as long as there are processes in the pool idle.

Advantages of using process pooling

1. Improve efficiency and save time for opening up processes and memory space and destroying processes

2. Save memory space

类/方法 功能 参数
Pool(processes)
创建进程池对象
processes 表示进程池
中有多少进程
pool.apply_async(func,a
rgs,kwds)
异步执行 ;将事件放入到进 程池队列
func 事件函数
args 以元组形式给
func 传参
kwds 以字典形式给
func 传参 返回值:返
回1个代表进程池事件的对
象,通过返回值的 get 方法
可以得到事件函数的返回值
pool.apply(func,args,kw
ds)
同步执行;将事件放入到进程 池队列
func 事件函数 args 以
元组形式给 func 传参
kwds 以字典形式给 func
传参
pool.close()
关闭进程池
pool.join()
回收进程池
pool.map(func,iter)
类似于 python 的 map 函
数,将要做的事件放入进程池
func 要执行的函数
iter 迭代对象

from multiprocessing import Pool
import os
from time import sleep
def func1(name):
    print(f" Of the current process ID:{os.getpid()},{name}")
    sleep(2)
    return name
def func2(args):
    print(args)
if __name__ == "__main__":
    pool = Pool(5)
    pool.apply_async(func = func1,args=('t1',),callback=func2)
    pool.apply_async(func = func1,args=('t2',),callback=func2)
    pool.apply_async(func = func1,args=('t3',),callback=func2)
    pool.apply_async(func = func1,args=('t4',))
    pool.apply_async(func = func1,args=('t5',))
    pool.apply_async(func = func1,args=('t6',))
    pool.close()
    pool.join()

from multiprocessing import Pool
import os
from time import sleep
def func1(name):
    print(f" Of the current process ID:{os.getpid()},{name}")
    sleep(2)
    return name
if __name__ == "__main__":
   with Pool(5) as pool:
        args = pool.map(func1,('t1,','t2,','t3,','t4,','t5,','t6,','t7,','t8,'))
        for a in args:
            print(a)

Summarize

This article is here, I hope to give you help, but also hope that you can pay more attention to this site more content!


Related articles: