Principle and Implementation of Python Multi process
- 2021-08-31 08:20:54
- OfStack
Basic concepts of 1 process
What is a process?
A process is a dynamic execution process of a program on a data set. Process 1 generally consists of three parts: program, data set and process control block. The program we write is used to describe what functions the process should complete and how to complete it; The data set is the resource that the program needs to use in the execution process. The process control block is used to record the external characteristics of the process and describe the process of execution and change. The system can use it to control and manage the process. It is the only sign that the system perceives the existence of the process.
Process lifecycle: Create (New), Ready (Runnable), Run (Running), Block (Block), Destroy (Destroy)
Process status (classification): (Actived) active process, visible process (Visiable), background process (Background), service process (Service), empty process
2 Parent and child processes
Linux operating system provides an fork () function to create child process, this function is very special, call 1 time, return twice, because the operating system is the current process (parent process) copy 1 (child process), and then return in the parent process and child process respectively. The child process always returns 0, while the parent process returns the PID of the child process. We can determine whether it is currently executing in the parent or child process by determining whether the return value is 0 or not.
The fork () function is also provided in Python, which is located under the os module.
# -*- coding: utf-8 -*-
import os
import time
print(" Before creating a child process : pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
pid = os.fork()
if pid == 0:
print(" Child process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
time.sleep(5)
else:
print(" Parent process information : pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
# pid That represents the child process being recycled pid
#pid, result = os.wait() # Collect child process resource blocking
time.sleep(5)
#print(" Parent process: Recycled child process pid=%d" % pid)
#print(" Parent process: When the child process exits result=%d" % result)
# The following will be printed twice, 1 The second is in the parent process, 1 The second is in a child process.
# The return value obtained in the parent process is the of the created child process pid , greater than 0
print("fork After creation : pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
2.1 How to distinguish parent and child processes?
The child process is generated by the parent process through fork (), pid = os. fork ()
If the return value pid is 0, whether it is a child process is judged. If it is 0, it means it is a child process
Since fork () is a concept on Linux, it is best to use the subprocess module to create child processes if you want to cross platforms.
2.2 How do child processes recycle?
In python, os. wait () method is used to recycle resources occupied by child processes
pid, result = os. wait () # Recycling child process resource blocks, waiting for child process to finish recycling
If there is a child process that has not been recycled, but the parent process has died, the child process is a zombie process.
3 Python Process Module
The process multiprocessing module of python has many ways to create processes, and each way to create processes and recycle process resources are different. The following three processes are analyzed respectively for Process, Pool and fork.
3.1 fork()
import os
pid = os.fork() # Create 1 Child process
os.wait() # Wait for the child process to finish releasing resources
pid For 0 The representative child process of the.
Disadvantages:
1. Poor compatibility, which can only be used under linux-like system, but windows system cannot be used;
2. Poor scalability, when multiple processes are needed, process management becomes very complicated;
3. There will be "orphan" processes and "zombie" processes, which need to recycle resources manually.
Advantages:
It is a low-level creation mode of the system, and its operation efficiency is high.
3.2 Process Process
multiprocessing module provides Process class to realize new process
# -*- coding: utf-8 -*-
import os
from multiprocessing import Process
import time
def fun(name):
print("2 Child process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
print("hello " + name)
def test():
print('ssss')
if __name__ == "__main__":
print("1 Main process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
ps = Process(target=fun, args=('jingsanpang', ))
print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
print("3 Process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
print(ps.is_alive()) # Before startup is_alive For False( The system was not created )
ps.start()
print(ps.is_alive()) # After startup, is_alive For True( The system has been created )
print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
print("4 Process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
ps.join() # Wait for the child process to complete the task Similar to os.wait()
print(ps.is_alive())
print("5 Process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
ps.terminate() # Termination process
print("6 Process information: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
Features:
1. Note: The Process object can create a process, but the Process object is not a process, and its deletion is not directly related to whether the system resources are recycled.
2. After the main process is executed, it will wait for the sub-process to recycle resources by default, and there is no need to recycle resources manually; join () function is used to control the order of the end of the sub-process, and there is also a function to clear the zombie process inside, which can recycle resources;
3. When the Process process is created, the sub-process will completely copy the Process object of the main process, so that there is one Process object in the main process and one in the sub-process, but p. start () starts the sub-process, and the Process object in the main process exists as a static object and is not executed.
4. When the sub-process is executed, a zombie process will be generated, which will be recycled by the join function, or another process will be opened, and the start function will also recycle the zombie process, so it is necessary to write the join function.
5. The windows system will automatically clear the Process object of the sub-process immediately after the end of the sub-process, while the Process object of the sub-process of the linux system will be cleared after the end of the main process without join function and start function.
You can also override the run method creation process by inheriting the Process object
3.3 Process Pool POOL (Multiple Processes)
import multiprocessing
import time
def work(msg):
mult_proces_name = multiprocessing.current_process().name
print('process: ' + mult_proces_name + '-' + msg)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) # Create 5 Processes
for i in range(20):
msg = "process %d" %(i)
pool.apply_async(work, (msg, ))
pool.close() # Close the process pool, indicating that you cannot add processes to the process pool
pool.join() # Wait for all processes in the process pool to finish executing, you must set the close() After that, call
print("Sub-process all done.")
pool.apply_async () in the above code is a variant of the apply () function, apply_async () is a parallel version of apply (), and apply () is a blocking version of apply_async (). Using apply (), the main process will be blocked until the end of function execution, so it is a blocking version. apply () is both a method of Pool and a built-in function of Python, both of which are equivalent. You can see that the output is not in the order in the for loop.
Multiple child processes and return values
apply_async () itself returns the return value of the function called by the process. In the last code that created multiple child processes, if a value is returned in the function func, the result of pool.apply_async (func, msg,)) is the object that returns the values of all processes in pool (note that it is the object, not the value itself).
import multiprocessing
import time
def func(msg):
return multiprocessing.current_process().name + '-' + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) # Create 4 Processes
results = []
for i in range(20):
msg = "process %d" %(i)
results.append(pool.apply_async(func, (msg, )))
pool.close() # Close the process pool, which means that you can no longer add processes to the process pool. You need to join Called before
pool.join() # Wait for all processes in the process pool to finish executing
print ("Sub-process(es) done.")
for res in results:
print (res.get())
Unlike the previous output, this output is orderly.
If the computer is 8 cores, establish 8 processes, enter top command under Ubuntu and press 1 on the big keyboard, you can see that the utilization rate of each CPU is relatively average
4 Interprocess communication mode
Pipeline pipe: Pipeline is a half-duplex communication mode, data can only flow in one direction, and can only be used between related processes. The kinship of a process usually refers to the parent-child process relationship.
Named pipe FIFO: Named pipe is also half-duplex communication, but it allows communication between unrelated processes.
Message Queuing MessageQueue: A message queue is a linked list of messages stored in the kernel and identified by a message queue identifier. Message Queuing overcomes the disadvantages of less signaling information, pipes can only carry unformatted byte streams, and buffer size is limited.
Shared storage SharedMemory: Shared memory is a map of 1 segment of memory that can be accessed by other processes. This segment of shared memory was created by one process but can be accessed by multiple processes. Shared memory is the fastest IPC mode, which is specially designed for the inefficiency of other inter-process communication modes. It is often used in conjunction with other communication mechanisms, such as signal two, to achieve synchronization and communication between processes.
Among the above inter-process communication methods, message queuing is the most frequently used one.
(1) Pipeline pipe
import multiprocessing
def foo(conn):
conn.send('hello father') # Directional pipe pipe Send a message
print(conn.recv())
if __name__ == '__main__':
conn1,conn2=multiprocessing.Pipe(True) # Open two mouths, both of which can enter and exit. If in brackets, False That is, one-way communication
p=multiprocessing.Process(target=foo,args=(conn1,)) # Child process uses sock Port, call foo Function
p.start()
print(conn2.recv()) # The main process uses conn Port receiving, from pipe ( Pipe ) to read messages from
conn2.send('hi son') # The main process uses conn Send by port
(2) Message Queuing Queue
Queue is a multi-process security queue, you can use Queue to achieve multi-process data transfer.
Some common methods of Queue:
Queue. qsize (): Returns the number of messages contained in the current queue; Queue. empty (): If the queue is empty, return True, otherwise False; Queue. full (): If the queue is full, return True, otherwise False; Queue. get (): Gets 1 message from the queue, then removes it from the queue, passing the timeout. Queue. get_nowait (): Equivalent to Queue. get (False). If the value is not obtained, an exception will be triggered: Empty; Queue. put (): Adds a value to the sequence, which can pass the timeout. Queue. put_nowait (): Queue. get (False). Error when queue is full: Full.Case:
from multiprocessing import Process, Queue
import time
def write(q):
for i in ['A', 'B', 'C', 'D', 'E']:
print('Put %s to queue' % i)
q.put(i)
time.sleep(0.5)
def read(q):
while True:
v = q.get(True)
print('get %s from queue' % v)
if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
print('write process = ', pw)
print('read process = ', pr)
pw.start()
pr.start()
pw.join()
pr.join()
pr.terminate()
pw.terminate()
Queue and pipe only realize data interaction, but not data sharing, that is, one process changes the data of another process.
Note: Interprocess communication should avoid sharing data as much as possible
5 Multi-process realization of producers and consumers
The following producer-consumer model is realized through multiple processes
import multiprocessing
from multiprocessing import Process
from time import sleep
import time
class MultiProcessProducer(multiprocessing.Process):
def __init__(self, num, queue):
"""Constructor"""
multiprocessing.Process.__init__(self)
self.num = num
self.queue = queue
def run(self):
t1 = time.time()
print('producer start ' + str(self.num))
for i in range(1000):
self.queue.put((i, self.num))
# print 'producer put', i, self.num
t2 = time.time()
print('producer exit ' + str(self.num))
use_time = str(t2 - t1)
print('producer ' + str(self.num) + ',
use_time: '+ use_time)
class MultiProcessConsumer(multiprocessing.Process):
def __init__(self, num, queue):
"""Constructor"""
multiprocessing.Process.__init__(self)
self.num = num
self.queue = queue
def run(self):
t1 = time.time()
print('consumer start ' + str(self.num))
while True:
d = self.queue.get()
if d != None:
# print 'consumer get', d, self.num
continue
else:
break
t2 = time.time()
print('consumer exit ' + str(self.num))
print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))
def main():
# create queue
queue = multiprocessing.Queue()
# create processes
producer = []
for i in range(5):
producer.append(MultiProcessProducer(i, queue))
consumer = []
for i in range(5):
consumer.append(MultiProcessConsumer(i, queue))
# start processes
for i in range(len(producer)):
producer[i].start()
for i in range(len(consumer)):
consumer[i].start()
# wait for processs to exit
for i in range(len(producer)):
producer[i].join()
for i in range(len(consumer)):
queue.put(None)
for i in range(len(consumer)):
consumer[i].join()
print('all done finish')
if __name__ == "__main__":
main()
6 Summary
There are two ways to create multiple processes in python:
(1) fork child process
(2) Using multiprocessing library to create child process
It should be noted that queue. Queue in the queue is thread-safe, but not process-safe, so multi-process 1 generally uses thread-safe, process-safe multiprocessing. Queue ()
In addition, the process pool is implemented using multiprocessing. Pool, pool = multiprocessing. Pool (processes = 3), generating a process pool, pool.apply_async implementing non-lease plug mode, and pool. apply implementing blocking mode.
apply_async and apply functions, the former being non-blocking and the latter being blocking. It can be seen that the multiple of the difference in running time is the number of process pools.
At the same time, the result information of non-leased call can be obtained through result. append (pool.apply_async (func, msg,)).
The above is the Python multi-process principle and implementation of the details, more information about python multi-process please pay attention to other related articles on this site!