Analysis of Basic Usage Examples of python threading and multiprocessing Modules

  • 2021-07-26 08:25:23
  • OfStack

This article illustrates the basic usage of python, threading and multiprocessing modules. Share it for your reference, as follows:

Preface

In order to do a small project in the past two days, I studied the concurrent programming of python. The so-called concurrency is nothing more than multi-threading and multi-process. At first, I found threading module, because I was impressed that threads are "lightweight...", "fast switching...", "shared process resources..." and so on, but I didn't expect the water here to be deep, so I found a better substitute multiprocessing module. The following will talk about some experiences in use.

The code that appears later has passed the test in the environment of ubuntu 10.04 + python 2.6. 5.

1. Create threads using the threading module

1, 3 Thread Creation Methods

(1) Pass in 1 function

This is the most basic method, that is, calling the constructor of Thread class in threading, then specifying the parameter target=func, and then calling with the returned instance of Thread start() Method, which starts running the thread, which executes the function func. Of course, if func requires parameters, you can pass the parameter args = (...) in the constructor of Thread. The sample code is as follows:


#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
# Functions for thread execution 
def counter(n):
  cnt = 0;
  for i in xrange(n):
    for j in xrange(i):
      cnt += j;
  print cnt;
if __name__ == '__main__':
 # Initialization 1 Thread objects, passing in functions counter And its parameters 1000
  th = threading.Thread(target=counter, args=(1000,));
 # Startup thread 
  th.start();
 # The main thread blocks and waits for the child thread to end 
  th.join();

This code is very intuitive, counter function is a very boring double loop, it should be noted that th.join() This sentence, which means that the main thread will block itself and then wait for the thread represented by th to finish executing before ending. Without this sentence, the running code will end immediately. The meaning of join is obscure. In fact, it will be easier to understand this sentence as "while th. is_alive (): time. sleep (1)". Although the meaning is the same, as you will see later, there are pitfalls in using join.

(2) Pass in 1 callable object

Many python objects are what we call callable, that is, any object that can be called through the function operator "()" (see Chapter 14 of python Core Programming). Objects of the class can also be called, and when called, the built-in methods of the object will be automatically called __call__() So the way to create a new thread is to assign the thread an object whose __call__ method is overloaded. The sample code is as follows:


#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
# Callable class 
class Callable(object):
  def __init__(self, func, args):
    self.func = func;
    self.args = args;
  def __call__(self):
    apply(self.func, self.args);
# Functions for thread execution 
def counter(n):
  cnt = 0;
  for i in xrange(n):
    for j in xrange(i):
      cnt += j;
  print cnt;
if __name__ == '__main__':
 # Initialization 1 Thread objects, passing in the callable Callable Object, and use the function counter And its parameters 1000 Initialize this object 
  th = threading.Thread(target=Callable(counter, (1000,)));
 # Startup thread 
  th.start();
 # The main thread blocks and waits for the child thread to end 
  th.join();

The key sentence of this example is apply(self.func, self.args); Here, the function object and its parameters passed in at initialization are used for one call.

(3) Inheriting the Thread class

This approach implements custom thread behavior by inheriting the Thread class and overloading its run method. The example code is as follows:


#!/usr/bin/python
#-*-coding:utf-8-*-
import threading, time, random
def counter():
  cnt = 0;
  for i in xrange(10000):
    for j in xrange(i):
      cnt += j;
class SubThread(threading.Thread):
  def __init__(self, name):
    threading.Thread.__init__(self, name=name);
  def run(self):
    i = 0;
    while i < 4:
      print self.name,'counting...\n';
      counter();
      print self.name,'finish\n';
      i += 1;
if __name__ == '__main__':
  th = SubThread('thread-1');
  th.start();
  th.join();
  print 'all done';

This example defines an SubThread class, which inherits the Thread class, overloads the run method, calls counter four times in the method and prints some information, which can be seen as intuitive. In the constructor, remember to call the constructor of the parent class for initialization.

2. Restrictions of python multithreading

python multithreading has an annoying restriction, the global interpreter lock (global interpreter lock), which means that only one thread can use the interpreter at any one time, and the documentary cpu runs multiple programs, which means that everyone uses it in turn. This is called "concurrent", not "parallel". The explanation in the manual is to ensure the correctness of the object model! The trouble caused by this lock is that if there is a computationally intensive thread occupying cpu, other threads have to wait... Imagine that there is such a thread in your multiple threads, how tragic it is, and multi-threads are made serial; Of course, this module is not useless. It is said in the manual that when used for IO intensive tasks, threads will release the interpreter during IO, so that other threads will have the opportunity to use the interpreter! Therefore, whether to use this module or not needs to consider the type of tasks faced.

2. Create a process using multiprocessing

1, 3 ways to create

The process is created in exactly the same way as the thread, except that threading. Thread is replaced by multiprocessing.Process . The multiprocessing module endeavors to maintain uniformity with the threading module on the method name. For example, refer to the threads section above. Only the first way to use functions is given here:


#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, time
def run():
  i = 0;
  while i<10000:
    print 'running';
    time.sleep(2);
    i += 1;
if __name__ == '__main__':
  p = multiprocessing.Process(target=run);
  p.start();
  #p.join();
  print p.pid;
  print 'master gone';

2. Create a process pool

The module also allows one group of processes to be created at a time and then assigned tasks to them. For details, please refer to the manual. There are not many studies in this part, so I dare not write about it.


pool = multiprocessing.Pool(processes=4)
pool.apply_async(func, args...)

3. Benefits of using processes

Completely parallel, without the limitation of GIL, and can make full use of the multi-cpu multi-core environment; You can accept linux signals, which will be seen later, and this function is very easy to use.

3. Case studies

The hypothetical tasks of this instance are: A main process will start multiple sub-processes to handle different tasks, Each sub-process may have its own thread for different IO processing (as mentioned earlier, threads are still good in IO). The function to be realized is to send signals to these sub-processes, which can be handled correctly. For example, when SIGTERM occurs, the sub-process can inform its thread to call it a day, and then exit "gracefully". The problems to be solved now are: (1) how to capture signals in subclassed Process objects; (2) How to "exit gracefully". The following are described separately.

1. Subclass Process and capture signals

If you are using the first process creation method (passing in functions), it is easy to capture signals. Assuming that the function run to the process is called func, the code example is as follows:


#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, signal,time
def handler(signum, frame):
  print 'signal', signum;
def run():
  signal.signal(signal.SIGTERM, handler);
  signal.signal(signal.SIGINT, handler);
  i = 0;
  while i<10000:
    print 'running';
    time.sleep(2);
    i += 1;
if __name__ == '__main__':
  p = multiprocessing.Process(target=run);
  p.start();
  #p.join();
  print p.pid;
  print 'master gone';

This code is modified from the first creation method, adding two lines signal.signal(...) Call, which means that this function should capture SIGTERM and SIGINT signals, and an handler function is added, which is used to process the signals when they are captured. We just simply print out the signal values here.

Attention p.join() It is commented out, which is a little different from the situation of threads. After the new process starts, it starts to run, and the main process doesn't have to wait for it to finish running, so it can do whatever it wants. After this code runs, the process id of the sub-process will be printed. According to this id, enter kill-TERM id on another terminal, and you will find that the terminal just now printed "signal 15".

However, there is one bad thing about using the passed-in function, that is, the encapsulation is too poor. If the function is a little more complicated, many global variables will be exposed. It is better to encapsulate the function into a class, so how to register the corresponding function with the class? It seems that only one global function can be used in the above example, and the manual does not give an example of processing signals in classes. In fact, the solutions are similar and easy. This post http://stackoverflow.com/questions/6204443/python-signal-reading-return-from-signal-handler-function gave me inspiration:


class Master(multiprocessing.Process):
  def __init__(self):
    super(Master,self).__init__();
    signal.signal(signal.SIGTERM, self.handler);   # Register signal processing function 
    self.live = 1;
  # Signal processing function 
  def handler(self, signum, frame):
    print 'signal:',signum;
    self.live = 0;
  def run(self):
    print 'PID:',self.pid;
    while self.live:
      print 'living...'
      time.sleep(2);

The method is very intuitive. First, register the signal handler in the constructor, and then define a method handler as the handler. This process class will print 1 "living..." every 2 seconds. When SIGTERM is received, the value of self. live is changed. The loop of run method ends when this value is detected to be 0, and the process also ends.

2. Let the process exit gracefully

Let's release the whole code of this hypothetical task. I started a sub-process in the main process (by subclassing Process class), and then two sub-threads were generated after the sub-process started to simulate the "producer-consumer" model. The two threads communicated through a queue. In order to mutually exclusive access to this queue, it is natural to add a lock (condition object is similar to Lock object, but there are too many functions of waiting and notifying); The producer generates one random number every time and throws it into the queue, then rests for one random time, and the consumer takes one number from the queue every time; The main thread in the sub-process is responsible for receiving the signal, so that the whole process can end gracefully. The code is as follows:


#!/usr/bin/python
#-*-coding:utf-8-*-
import time, multiprocessing, signal, threading, random, time, Queue
class Master(multiprocessing.Process):
  def __init__(self):
    super(Master,self).__init__();
    signal.signal(signal.SIGTERM, self.handler);
 # This variable should be passed into the thread to control the thread to run. Why use dict ? Make full use of the characteristics of sharing resources among threads 
 # Because mutable objects are passed by reference, scalars are passed by value, so they are not written as self.live = true Try 
    self.live = {'stat':True};
  def handler(self, signum, frame):
    print 'signal:',signum;
    self.live['stat'] = 0;                  # Set this variable to 0 Notify the child thread that it is time to "call it a day." 
  def run(self):
    print 'PID:',self.pid;
    cond = threading.Condition(threading.Lock());      # Create 1 A condition Object for child thread interaction 
    q = Queue.Queue();                    #1 Queues 
    sender = Sender(cond, self.live, q);           # Incoming shared resource 
    geter = Geter(cond, self.live, q);
    sender.start();                     # Startup thread 
    geter.start();
    signal.pause();                     # The main thread sleeps and waits for a signal 
    while threading.activeCount()-1:             # After the main thread receives the signal and is awakened, check how many threads are still alive (get rid of themselves) 
      time.sleep(2);                    # Then sleep and wait to ensure that all child threads end safely 
      print 'checking live', threading.activeCount();
    print 'mater gone';
class Sender(threading.Thread):
  def __init__(self, cond, live, queue):
    super(Sender, self).__init__(name='sender');
    self.cond = cond;
    self.queue = queue;
    self.live = live
  def run(self):
    cond = self.cond;
    while self.live['stat']:                 # Check the "global" variables in this process, and continue to run if it is true 
      cond.acquire();                   # Get a lock to control the queue 
      i = random.randint(0,100);
      self.queue.put(i,False);
      if not self.queue.full():
        print 'sender add:',i;
      cond.notify();                    # Wake up other threads waiting for locks 
      cond.release();                   # Release lock 
      time.sleep(random.randint(1,3));
    print 'sender done'
class Geter(threading.Thread):
  def __init__(self, cond, live, queue):
    super(Geter, self).__init__(name='geter');
    self.cond = cond;
    self.queue = queue;
    self.live = live
  def run(self):
    cond = self.cond;
    while self.live['stat']:
      cond.acquire();
      if not self.queue.empty():
        i = self.queue.get();
        print 'geter get:',i;
      cond.wait(3);
      cond.release();
      time.sleep(random.randint(1,3));
    print 'geter done'
if __name__ == '__main__':
  master = Master();
  master.start();                       # Promoter process 

It should be noted that in the run method of Master, sender.start() And geter.start() After that, it is common sense to call sender.join() And th.join()0 Let the main thread wait for the sub-thread to end. The trap of join mentioned earlier is here. join blocks the main thread (blocking), and the main thread can no longer capture signals. When I first started studying this piece, I thought that the signal processing function was written wrong. There is less discussion on the Internet, and it is clear here that http://stackoverflow.com/questions/631441/interruptible-thread-join-in-python, http://www.gossamer-threads.com/lists/python/python/541403

Reference:

python Core Programming
"python manual"

For more readers interested in Python related contents, please check the topics on this site: Summary of Python Process and Thread Operation Skills, Tutorial on Python Data Structure and Algorithm, Summary of Python Function Use Skills, Summary of Python String Operation Skills, Introduction and Advanced Classic Tutorial on Python, Introduction Tutorial on Python+MySQL Database Programming and Summary of Python Common Database Operation Skills

I hope this article is helpful to everyone's Python programming.


Related articles: