Principle and Implementation of Python Multi process

  • 2021-08-31 08:20:54
  • OfStack

Basic concepts of 1 process

What is a process?

A process is a dynamic execution process of a program on a data set. Process 1 generally consists of three parts: program, data set and process control block. The program we write is used to describe what functions the process should complete and how to complete it; The data set is the resource that the program needs to use in the execution process. The process control block is used to record the external characteristics of the process and describe the process of execution and change. The system can use it to control and manage the process. It is the only sign that the system perceives the existence of the process.

Process lifecycle: Create (New), Ready (Runnable), Run (Running), Block (Block), Destroy (Destroy)

Process status (classification): (Actived) active process, visible process (Visiable), background process (Background), service process (Service), empty process

2 Parent and child processes

Linux operating system provides an fork () function to create child process, this function is very special, call 1 time, return twice, because the operating system is the current process (parent process) copy 1 (child process), and then return in the parent process and child process respectively. The child process always returns 0, while the parent process returns the PID of the child process. We can determine whether it is currently executing in the parent or child process by determining whether the return value is 0 or not.

The fork () function is also provided in Python, which is located under the os module.


# -*- coding: utf-8 -*- 
import os
import time

print(" Before creating a child process : pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

pid = os.fork()
if pid == 0:
  print(" Child process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  time.sleep(5)
else:
  print(" Parent process information : pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  # pid That represents the child process being recycled pid
  #pid, result = os.wait() #  Collect child process resource blocking 
  time.sleep(5)
  #print(" Parent process: Recycled child process pid=%d" % pid)
  #print(" Parent process: When the child process exits  result=%d" % result)

#  The following will be printed twice, 1 The second is in the parent process, 1 The second is in a child process. 
#  The return value obtained in the parent process is the of the created child process pid , greater than 0
print("fork After creation : pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

2.1 How to distinguish parent and child processes?

The child process is generated by the parent process through fork (), pid = os. fork ()

If the return value pid is 0, whether it is a child process is judged. If it is 0, it means it is a child process

Since fork () is a concept on Linux, it is best to use the subprocess module to create child processes if you want to cross platforms.

2.2 How do child processes recycle?

In python, os. wait () method is used to recycle resources occupied by child processes

pid, result = os. wait () # Recycling child process resource blocks, waiting for child process to finish recycling

If there is a child process that has not been recycled, but the parent process has died, the child process is a zombie process.

3 Python Process Module

The process multiprocessing module of python has many ways to create processes, and each way to create processes and recycle process resources are different. The following three processes are analyzed respectively for Process, Pool and fork.

3.1 fork()


import os
pid = os.fork() #  Create 1 Child process 
os.wait() #  Wait for the child process to finish releasing resources 
pid For 0 The representative child process of the. 

Disadvantages:
1. Poor compatibility, which can only be used under linux-like system, but windows system cannot be used;
2. Poor scalability, when multiple processes are needed, process management becomes very complicated;
3. There will be "orphan" processes and "zombie" processes, which need to recycle resources manually.
Advantages:
It is a low-level creation mode of the system, and its operation efficiency is high.

3.2 Process Process

multiprocessing module provides Process class to realize new process


# -*- coding: utf-8 -*-
import os
from multiprocessing import Process
import time

def fun(name):
  print("2  Child process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  print("hello " + name)


def test():
  print('ssss')


if __name__ == "__main__":
  print("1  Main process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  ps = Process(target=fun, args=('jingsanpang', ))
  print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
  print("3  Process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  print(ps.is_alive()) #  Before startup  is_alive For False( The system was not created )
  ps.start()
  print(ps.is_alive()) #  After startup, is_alive For True( The system has been created )

  print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
  print("4  Process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  ps.join() #  Wait for the child process to complete the task    Similar to os.wait()
  print(ps.is_alive())
  print("5  Process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  ps.terminate() # Termination process 
  print("6  Process information:  pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

Features:

1. Note: The Process object can create a process, but the Process object is not a process, and its deletion is not directly related to whether the system resources are recycled.
2. After the main process is executed, it will wait for the sub-process to recycle resources by default, and there is no need to recycle resources manually; join () function is used to control the order of the end of the sub-process, and there is also a function to clear the zombie process inside, which can recycle resources;
3. When the Process process is created, the sub-process will completely copy the Process object of the main process, so that there is one Process object in the main process and one in the sub-process, but p. start () starts the sub-process, and the Process object in the main process exists as a static object and is not executed.

4. When the sub-process is executed, a zombie process will be generated, which will be recycled by the join function, or another process will be opened, and the start function will also recycle the zombie process, so it is necessary to write the join function.
5. The windows system will automatically clear the Process object of the sub-process immediately after the end of the sub-process, while the Process object of the sub-process of the linux system will be cleared after the end of the main process without join function and start function.

You can also override the run method creation process by inheriting the Process object

3.3 Process Pool POOL (Multiple Processes)


import multiprocessing
import time

def work(msg):
  mult_proces_name = multiprocessing.current_process().name
  print('process: ' + mult_proces_name + '-' + msg)


if __name__ == "__main__":
  pool = multiprocessing.Pool(processes=5) #  Create 5 Processes 
  for i in range(20):
    msg = "process %d" %(i)
    pool.apply_async(work, (msg, ))
  pool.close() #  Close the process pool, indicating that you cannot add processes to the process pool 
  pool.join() #  Wait for all processes in the process pool to finish executing, you must set the close() After that, call 
  print("Sub-process all done.")

pool.apply_async () in the above code is a variant of the apply () function, apply_async () is a parallel version of apply (), and apply () is a blocking version of apply_async (). Using apply (), the main process will be blocked until the end of function execution, so it is a blocking version. apply () is both a method of Pool and a built-in function of Python, both of which are equivalent. You can see that the output is not in the order in the for loop.

Multiple child processes and return values

apply_async () itself returns the return value of the function called by the process. In the last code that created multiple child processes, if a value is returned in the function func, the result of pool.apply_async (func, msg,)) is the object that returns the values of all processes in pool (note that it is the object, not the value itself).


import multiprocessing
import time

def func(msg):
  return multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
  pool = multiprocessing.Pool(processes=4) #  Create 4 Processes 
  results = []
  for i in range(20):
    msg = "process %d" %(i)
    results.append(pool.apply_async(func, (msg, )))
  pool.close() #  Close the process pool, which means that you can no longer add processes to the process pool. You need to join Called before 
  pool.join() #  Wait for all processes in the process pool to finish executing 
  print ("Sub-process(es) done.")

  for res in results:
    print (res.get())

Unlike the previous output, this output is orderly.

If the computer is 8 cores, establish 8 processes, enter top command under Ubuntu and press 1 on the big keyboard, you can see that the utilization rate of each CPU is relatively average

4 Interprocess communication mode

Pipeline pipe: Pipeline is a half-duplex communication mode, data can only flow in one direction, and can only be used between related processes. The kinship of a process usually refers to the parent-child process relationship.
Named pipe FIFO: Named pipe is also half-duplex communication, but it allows communication between unrelated processes.
Message Queuing MessageQueue: A message queue is a linked list of messages stored in the kernel and identified by a message queue identifier. Message Queuing overcomes the disadvantages of less signaling information, pipes can only carry unformatted byte streams, and buffer size is limited.
Shared storage SharedMemory: Shared memory is a map of 1 segment of memory that can be accessed by other processes. This segment of shared memory was created by one process but can be accessed by multiple processes. Shared memory is the fastest IPC mode, which is specially designed for the inefficiency of other inter-process communication modes. It is often used in conjunction with other communication mechanisms, such as signal two, to achieve synchronization and communication between processes.
Among the above inter-process communication methods, message queuing is the most frequently used one.

(1) Pipeline pipe


import multiprocessing

def foo(conn):
  conn.send('hello father')  # Directional pipe pipe Send a message 
  print(conn.recv())

if __name__ == '__main__':
  conn1,conn2=multiprocessing.Pipe(True)  # Open two mouths, both of which can enter and exit. If in brackets, False That is, one-way communication 
  p=multiprocessing.Process(target=foo,args=(conn1,)) # Child process uses sock Port, call foo Function 
  p.start()
  print(conn2.recv()) # The main process uses conn Port receiving, from pipe ( Pipe ) to read messages from 
  conn2.send('hi son') # The main process uses conn Send by port 

(2) Message Queuing Queue

Queue is a multi-process security queue, you can use Queue to achieve multi-process data transfer.

Some common methods of Queue:

Queue. qsize (): Returns the number of messages contained in the current queue; Queue. empty (): If the queue is empty, return True, otherwise False; Queue. full (): If the queue is full, return True, otherwise False; Queue. get (): Gets 1 message from the queue, then removes it from the queue, passing the timeout. Queue. get_nowait (): Equivalent to Queue. get (False). If the value is not obtained, an exception will be triggered: Empty; Queue. put (): Adds a value to the sequence, which can pass the timeout. Queue. put_nowait (): Queue. get (False). Error when queue is full: Full.

Case:


from multiprocessing import Process, Queue
import time


def write(q):
  for i in ['A', 'B', 'C', 'D', 'E']:
   print('Put %s to queue' % i)
   q.put(i)
   time.sleep(0.5)


def read(q):
  while True:
   v = q.get(True)
   print('get %s from queue' % v)


if __name__ == '__main__':
  q = Queue()
  pw = Process(target=write, args=(q,))
  pr = Process(target=read, args=(q,))
  print('write process = ', pw)
  print('read process = ', pr)
  pw.start()
  pr.start()
  pw.join()
  pr.join()
  pr.terminate()
  pw.terminate()

Queue and pipe only realize data interaction, but not data sharing, that is, one process changes the data of another process.

Note: Interprocess communication should avoid sharing data as much as possible

5 Multi-process realization of producers and consumers

The following producer-consumer model is realized through multiple processes


import multiprocessing
from multiprocessing import Process
from time import sleep
import time


class MultiProcessProducer(multiprocessing.Process):
  def __init__(self, num, queue):
   """Constructor"""
   multiprocessing.Process.__init__(self)
   self.num = num
   self.queue = queue

  def run(self):
   t1 = time.time()
   print('producer start ' + str(self.num))
   for i in range(1000):
     self.queue.put((i, self.num))
   # print 'producer put', i, self.num
   t2 = time.time()

   print('producer exit ' + str(self.num))
   use_time = str(t2 - t1)
   print('producer ' + str(self.num) + ', 
   use_time: '+ use_time)



class MultiProcessConsumer(multiprocessing.Process):
  def __init__(self, num, queue):
   """Constructor"""
   multiprocessing.Process.__init__(self)
   self.num = num
   self.queue = queue

  def run(self):
   t1 = time.time()
   print('consumer start ' + str(self.num))
   while True:
     d = self.queue.get()
     if d != None:
      # print 'consumer get', d, self.num
      continue
     else:
      break
   t2 = time.time()
   print('consumer exit ' + str(self.num))
   print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))


def main():
  # create queue
  queue = multiprocessing.Queue()

  # create processes
  producer = []
  for i in range(5):
   producer.append(MultiProcessProducer(i, queue))

  consumer = []
  for i in range(5):
   consumer.append(MultiProcessConsumer(i, queue))

  # start processes
  for i in range(len(producer)):
   producer[i].start()

  for i in range(len(consumer)):
   consumer[i].start()

  # wait for processs to exit
  for i in range(len(producer)):
   producer[i].join()

  for i in range(len(consumer)):
   queue.put(None)

  for i in range(len(consumer)):
   consumer[i].join()

  print('all done finish')


if __name__ == "__main__":
  main()

6 Summary

There are two ways to create multiple processes in python:

(1) fork child process

(2) Using multiprocessing library to create child process

It should be noted that queue. Queue in the queue is thread-safe, but not process-safe, so multi-process 1 generally uses thread-safe, process-safe multiprocessing. Queue ()

In addition, the process pool is implemented using multiprocessing. Pool, pool = multiprocessing. Pool (processes = 3), generating a process pool, pool.apply_async implementing non-lease plug mode, and pool. apply implementing blocking mode.

apply_async and apply functions, the former being non-blocking and the latter being blocking. It can be seen that the multiple of the difference in running time is the number of process pools.

At the same time, the result information of non-leased call can be obtained through result. append (pool.apply_async (func, msg,)).

The above is the Python multi-process principle and implementation of the details, more information about python multi-process please pay attention to other related articles on this site!


Related articles: