Python multi threading learning materials

  • 2020-04-02 09:35:50
  • OfStack

I. thread usage in Python:
There are two ways to use threads in Python: functions or classes to wrap thread objects.
1. Functional: call the start_new_thread() function in the thread module to generate a new thread. The following cases:
 
import time 
import thread 
def timer(no, interval): 
cnt = 0 
while cnt<10: 
print 'Thread:(%d) Time:%sn'%(no, time.ctime()) 
time.sleep(interval) 
cnt+=1 
thread.exit_thread() 

def test(): #Use thread.start_new_thread() to create 2 new threads 
thread.start_new_thread(timer, (1,1)) 
thread.start_new_thread(timer, (2,2)) 
if __name__=='__main__': 
test() 

The above example defines a thread function, the timer, that prints out 10 time records and exits at a time interval determined by the interval parameter. The first parameter of thread.start_new_thread(function, args[, kwargs]) is the thread function (the timer method in this example), the second parameter is the parameter passed to the thread function, which must be of tuple type, and kwargs is optional.
The thread can end by waiting for the thread to end naturally, or by calling the thread.exit() or thread.exit_thread() methods in the thread function.
2. Create a subclass of threading.thread to wrap a Thread object, as shown in the following example:
 
import threading 
import time 
class timer(threading.Thread): #The timer class is derived from the class threading.Thread 
def __init__(self, num, interval): 
threading.Thread.__init__(self) 
self.thread_num = num 
self.interval = interval 
self.thread_stop = False 
def run(self): #Overwrite run() method, put what you want the thread do here 
while not self.thread_stop: 
print 'Thread Object(%d), Time:%sn' %(self.thread_num, time.ctime()) 
time.sleep(self.interval) 
def stop(self): 
self.thread_stop = True 

def test(): 
thread1 = timer(1, 1) 
thread2 = timer(2, 2) 
thread1.start() 
thread2.start() 
time.sleep(10) 
thread1.stop() 
thread2.stop() 
return 
if __name__ == '__main__': 
test() 

Personally, I prefer the second approach, which is to create your own threading class and override the methods of the threading.thread class if necessary, so that you can customize the control of the threads.
Usage of threading.Thread class:
1. Call threading.Thread.
Threadname is the name of the thread
2. Run (), which usually needs to be overwritten, writes code to achieve the required functionality.
3, getName(), get the thread object name
4, setName(), set the name of the thread object
5, start(), start the thread
6, jion([timeout]), wait for another thread to finish before running.
SetDaemon (bool), which sets whether the child thread ends with the main thread, must be called before start(). The default is False.
8, isDaemon(), to determine whether the thread ends with the main thread.
9, isAlive(), check whether the thread is running.
In addition, the threading module itself provides a number of methods and other classes to help us better use and manage threads. See http://www.python.org/doc/2.5.2/lib/module-threading.html.


Suppose that two thread objects t1 and t2 both want to increment num=0 by 1, and both t1 and t2 modify num 10 times. The final result of num should be 20. But because it is multi-threaded access, it is possible to have the following situation: when num=0, t1 gets num=0. At this time, the system will dispatch t1 to the state of "sleeping" and convert t2 to the state of "running", and then num=0 will be obtained from t2 page. Then t2 adds 1 to the resulting value and assigns num, so that num is equal to 1. Then the system will dispatch t2 as "sleeping" and change t1 to "running". Thread t1 assigns to num its previous 0 plus 1. So, both t1 and t2 have done 1 plus 1, but the result is still num=1.

The above case describes one of the most common problems with multi-threading: data sharing. When multiple threads want to modify a Shared data, we need to synchronize data access.

1. Simple synchronization

The simplest synchronization mechanism is a lock. The lock object is created by the threading.RLock class. A thread can acquire a lock using the acquire() method of the lock so that the lock enters the "locked" state. Only one thread at a time can acquire the lock. If another thread when trying to get the lock will be system becomes "blocked" status, until the thread calls the lock with lock release () method to release the lock, the lock will enter the "unlocked" state. The thread in the "blocked" state receives a notification and has the right to acquire the lock. If multiple threads are in the "blocked" state, all threads will first lift the "blocked" state, and then the system selects one thread to acquire the lock, while other threads remain silent (" blocked ").
Thread modules and Lock objects in Python are low-level thread control tools provided by Python and are very simple to use. As shown in the following example:
 
import thread 
import time 
mylock = thread.allocate_lock() #Allocate a lock 
num=0 #Shared resource 
def add_num(name): 
global num 
while True: 
mylock.acquire() #Get the lock 
# Do something to the shared resource 
print 'Thread %s locked! num=%s'%(name,str(num)) 
if num >= 5: 
print 'Thread %s released! num=%s'%(name,str(num)) 
mylock.release() 
thread.exit_thread() 
num+=1 
print 'Thread %s released! num=%s'%(name,str(num)) 
mylock.release() #Release the lock. 
def test(): 
thread.start_new_thread(add_num, ('A',)) 
thread.start_new_thread(add_num, ('B',)) 
if __name__== '__main__': 
test() 

Python also provides an advanced thread-control library on top of thread, as mentioned earlier with threading. The Python threading module is a module built on top of the thread module, which exposes many of the attributes of the thread module. In thread module, python provides a user-level thread synchronization tool "Lock" object. In the threading module, python provides a variant of the Lock object: the RLock object. Internally, the RLock object maintains a Lock object, which is a reentrant object. For the Lock object, if a thread carries out a acquire operation twice in a row, the second acquire will suspend the thread because there is no release after the first acquire. This causes the Lock object to never be released, leaving the thread deadlocked. The RLock object allows a thread to acquire it multiple times, because the number of times a thread acquires is maintained within it through a counter variable. Moreover, each acquire operation must have a release operation corresponding to it. After all the release operations are completed, other threads can apply for the RLock object.

Let's take a look at how to synchronize using the threading RLock object.

 
import threading 
mylock = threading.RLock() 
num=0 
class myThread(threading.Thread): 
def __init__(self, name): 
threading.Thread.__init__(self) 
self.t_name = name 
def run(self): 
global num 
while True: 
mylock.acquire() 
print 'nThread(%s) locked, Number: %d'%(self.t_name, num) 
if num>=4: 
mylock.release() 
print 'nThread(%s) released, Number: %d'%(self.t_name, num) 
break 
num+=1 
print 'nThread(%s) released, Number: %d'%(self.t_name, num) 
mylock.release() 
def test(): 
thread1 = myThread('A') 
thread2 = myThread('B') 
thread1.start() 
thread2.start() 
if __name__== '__main__': 
test() 

We call code that modifies Shared data a "critical section." All "critical sections" must be enclosed between acquire and release of the same lock object.

2. Conditional synchronization

Locks provide only the most basic synchronization. If a "critical section" is accessed only when certain events occur, the Condition variable Condition is used.
Condition object is the wrapper of the Lock object. When the Condition object is created, its constructor needs a Lock object as a parameter. If there is no such parameter, Condition will create an Rlock object internally. On the Condition object, of course, acquire and release operations can also be called, because the internal Lock object itself supports these operations. But the value of Condition lies in the semantics of wait and notify that it provides.
How do conditional variables work? First, after a thread successfully obtains a condition variable, calling the wait() method of the condition variable causes the thread to release the lock and enter the "blocked" state, until another thread invokes the notify() method of the same condition variable to wake up the "blocked" thread. Calling the notifyAll() method of the condition variable wakes up all waiting threads.
A deadlock occurs when a program or thread is permanently blocked. So if you use synchronization mechanisms such as locks, condition variables, and so on, be sure to check carefully to prevent deadlock situations. The finally clause in the exception-handling mechanism is used to ensure the release of the lock for critical sections where exceptions may occur. A thread waiting for a condition variable must explicitly wake up with the notify() method or remain silent forever. Make sure that every call to the wait() method has a corresponding call to notify(), and of course, call the notifyAll() method just in case.


The problem of producer and consumer is a typical synchronization problem. Here are two different implementations.

1. Conditional variables
 
import threading 
import time 
class Producer(threading.Thread): 
def __init__(self, t_name): 
threading.Thread.__init__(self, name=t_name) 

def run(self): 
global x 
con.acquire() 
if x > 0: 
con.wait() 
else: 
for i in range(5): 
x=x+1 
print "producing..." + str(x) 
con.notify() 
print x 
con.release() 

class Consumer(threading.Thread): 
def __init__(self, t_name): 
threading.Thread.__init__(self, name=t_name) 
def run(self): 
global x 
con.acquire() 
if x == 0: 
print 'consumer wait1' 
con.wait() 
else: 
for i in range(5): 
x=x-1 
print "consuming..." + str(x) 
con.notify() 
print x 
con.release() 

con = threading.Condition() 
x=0 
print 'start consumer' 
c=Consumer('consumer') 
print 'start producer' 
p=Producer('producer') 

p.start() 
c.start() 
p.join() 
c.join() 
print x 

In the above example, in the initial state, the Consumer is in a wait state, and after the Producer has produced continuously (adding 1 to x) for 5 times, the waiting Consumer of notify is. The Consumer is awakened to consume (subtract 1 from x)

2. Synchronize the queue

The Queue object in Python also provides support for thread synchronization. Queue objects are used to implement FIFO queues formed by multiple producers and multiple consumers.
The producer stores the data in the queue, and the consumer takes the data out of the queue.
 
# producer_consumer_queue 
from Queue import Queue 
import random 
import threading 
import time 
#Producer thread 
class Producer(threading.Thread): 
def __init__(self, t_name, queue): 
threading.Thread.__init__(self, name=t_name) 
self.data=queue 
def run(self): 
for i in range(5): 
print "%s: %s is producing %d to the queue!n" %(time.ctime(), self.getName(), i) 
self.data.put(i) 
time.sleep(random.randrange(10)/5) 
print "%s: %s finished!" %(time.ctime(), self.getName()) 
#Consumer thread 
class Consumer(threading.Thread): 
def __init__(self, t_name, queue): 
threading.Thread.__init__(self, name=t_name) 
self.data=queue 
def run(self): 
for i in range(5): 
val = self.data.get() 
print "%s: %s is consuming. %d in the queue is consumed!n" %(time.ctime(), self.getName(), val) 
time.sleep(random.randrange(10)) 
print "%s: %s finished!" %(time.ctime(), self.getName()) 
#Main thread 
def main(): 
queue = Queue() 
producer = Producer('Pro.', queue) 
consumer = Consumer('Con.', queue) 
producer.start() 
consumer.start() 
producer.join() 
consumer.join() 
print 'All threads terminate!' 
if __name__ == '__main__': 
main() 


In the above example, the Producer produces a "product" at random times and puts it in a queue. The Consumer sees a "product" in the queue and consumes it. In this case, because the Producer produces more products than the Consumer consumes, the Consumer consumes only one product after the Producer produces several "products".

The Queue module implements a FIFO Queue that supports multiple producers and multiple consumers. Queues are useful when sharing information needs to be exchanged safely between multiple threads. The default length of Queue is infinite, but you can set the maxsize parameter of its constructor to set its length. The put method of Queue is inserted at the end of the Queue. The prototype of this method is:

Put (item [, block [, timeout]])

If the optional parameter block is true and the timeout is None (the default), the thread is blocked until the queue has an empty data cell. If timeout is greater than 0 and there is still no data unit available for the duration of the timeout, Full exception is thrown. Conversely, if the block parameter is false (timeout parameter is ignored), the item is immediately added to the free data cell, and if there is no free data cell, the Full exception is thrown.

The get method of a Queue takes data from the head of the Queue and takes the same parameters as the put method. If the block parameter is true and timeout is None (the default), the thread is blocked until there is data in the queue. If timeout is greater than 0, there is still no available data in timeout time, and the Empty exception is thrown. Conversely, if the block parameter is false (timeout parameter is ignored), the data in the queue is immediately fetched. If there is no available data at this point, an Empty exception will also be thrown.

Related articles: