Detailed explanation of message communication mechanism of Python concurrent programming thread

  • 2021-12-11 08:09:41
  • OfStack

Directory 1 Event Event 2 Condition3 Queue Queue 4 Summary 1

I have already introduced how to use create threads and start threads. I believe everyone will have such an idea that threads are nothing more than creating 1, and then start() Next, it's too simple.

However, we should know that in a real project, the examples we give in the actual scenario are much more complicated, and the execution of different threads may be sequential, or their execution may be conditional and controlled. If we only rely on the shallow knowledge we have learned before, it is far from enough.

Today, let's discuss how to control the trigger execution of threads under 1.

To realize the control of multiple threads, in fact, the message communication mechanism is in effect, which uses this mechanism to send instructions to tell threads when they can execute, when they can't execute and what to execute.

After my summary, there are roughly three communication methods in threads:

threading.Event

threading.Condition

queue.Queue

Next, let's discuss 11.

1 Event Event

Python provides a very simple communication mechanism Threading.Event General condition variables. Multiple threads can wait for an event to occur, after which all threads will be activated.

The use of Event is also super simple, just three functions


event = threading.Event()

#  Reset event So that all of the event Events are on standby 
event.clear()

#  Waiting to receive event Determines whether to block program execution 
event.wait()

#  Send event Directive, causing all settings for the event Thread execution of the event 
event.set()

Let's take an example.


import time
import threading

class MyThread(threading.Thread):
    def __init__(self, name, event):
        super().__init__()
        self.name = name
        self.event = event

    def run(self):
        print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
        #  Wait event.set() After, you can proceed 
        self.event.wait()
        print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))

threads = []
event = threading.Event()

#  Definition 5 Threads 
[threads.append(MyThread(str(i), event)) for i in range(1,5)]

#  Reset event , so that event.wait() Play a blocking role 
event.clear()

#  Start all threads 
[t.start() for t in threads]

print(' Wait 5s...')
time.sleep(5)

print(' Wake up all threads ...')
event.set()

Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018

 Wait 5s...

 Wake up all threads ...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018

It can be seen that when all threads start ( start() ), it is not finished, but it is all in self.event.wait() It's stopped. We need to pass event.set() To send execution instructions to all threads before executing.

2 Condition

Condition and Event are similar and not much different.

Similarly, Condition only needs to master a few functions.


cond = threading.Condition()

#  Similar lock.acquire()
cond.acquire()

#  Similar lock.release()
cond.release()

#  Waiting for the specified trigger will release the acquisition of the lock at the same time , Until it was notify To regain possession. 
cond.wait()

#  Send designation, trigger execution 
cond.notify()

Let's take a look at an interesting example of hide-and-seek on the Internet


import threading, time

class Hider(threading.Thread):
    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)  # Make sure to run first Seeker Methods in 
        self.cond.acquire()
        print(self.name + ':  I have blindfolded my eyes ')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ':  I found you oh  ~_~')
        self.cond.notify() 
        self.cond.release()
        print(self.name + ':  I won ')

class Seeker(threading.Thread):
    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name
        
    def run(self):
        self.cond.acquire()
        self.cond.wait()
        print(self.name + ':  I've already hidden it. Come and find me quickly ')
        self.cond.notify()
        self.cond.wait()
        self.cond.release()
        print(self.name + ':  You found it, hey ~~~')
        
cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

Communicate through cond, block yourself and make the other party execute. Thus, sequential execution is achieved.
Look at the results


hider:    I have blindfolded my eyes 
seeker:   I've already hidden it. Come and find me quickly 
hider:    I found you  ~_~
hider:    I won 
seeker:   You found it, hey ~~~

3 Queue queue

The last one, queues, is the focus of this section, because it is the most frequently used in our daily development.

Perhaps the safest way to send data from one thread to another is to use queues in the queue library. Creates an Queue object shared by multiple threads that use the put() And get() Operation to send and get elements to the queue.

Similarly, for Queue, we only need to master a few functions.


from queue import Queue
# maxsize Default to 0 , unrestricted 
# 1 Dan >0 And the number of messages reaches the limit, q.put() Will also block 
q = Queue(maxsize=0)

#  Default blocking program, waiting for queue messages, and setting timeout time 
q.get(block=True, timeout=None)

#  Send a message: By default, the program will be blocked until there is a free place in the queue to put data 
q.put(item, block=True, timeout=None)

#  Waiting for all messages to be consumed 
q.join()

#  Notifies the queue that task processing has completed, and when all tasks are processed, join()  The blocking will be lifted 
q.task_done()

The following 3 methods, just know, 1 generally do not need to use


#  Query the number of messages in the current queue 
q.qsize()

#  Whether all queue messages have been consumed and returned  True/False
q.empty()

#  Check whether the message in the queue is full 
q.full()

The function will be 1 more than the previous one, and it also shows that its functions are richer from another aspect.

Let me give you an example of a teacher calling the roll.


# coding=utf-8
# /usr/bin/env python

'''
Author: wangbm
Email: wongbingming@163.com
Wechat: mrbensonwon
Blog: python-online.cn
 WeChat official account: Python Programming time 
date: 2020/9/20  Afternoon 7:30
desc: 
'''
__author__ = 'wangbm'
from queue import Queue
from threading import Thread
import time
class Student:
    def __init__(self, name):
        self.name = name

    def speak(self):
        print("{} : Here! ".format(self.name))

class Teacher:
    def __init__(self, queue):
        super().__init__()
        self.queue=queue

    def call(self, student_name):
        if student_name == "exit":
            print(" The roll call is over and the class begins ..")
        else:
            print(" Teacher: {} Are you here yet? ".format(student_name))
            #  Send a message and ask whose name 
        self.queue.put(student_name)

class CallManager(Thread):
    def __init__(self, queue):
        super().__init__()
        self.students = {}
        self.queue = queue

    def put(self, student):
        self.students.setdefault(student.name, student)

    def run(self):
        while True:
            #  Blocking programs, listening to teachers at all times, and receiving messages 
            student_name = queue.get()
            if student_name == "exit":
                break
            elif student_name in self.students:
                self.students[student_name].speak()
            else:
                print(" Teacher, our class, no  {}  This man ".format(student_name))

queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name=" Xiao Ming ")
s2 = Student(name=" Xiaoliang ")
cm = CallManager(queue)
cm.put(s1)
cm.put(s2)
cm.start()
print(' Begin roll call ~')
teacher.call(' Xiao Ming ')
time.sleep(1)
teacher.call(' Xiaoliang ')
time.sleep(1)
teacher.call("exit")

The results are as follows

Start roll call ~
Teacher: Is Xiao Ming here yet?
Xiaoming: Here!
Teacher: Is Xiaoliang here yet?
Xiaoliang: Here!
The roll call is over and the class begins.

In fact, there is another very important method of queue, Queue.task_done ()

If we don't understand its principle, we are writing a program, and we are likely to get stuck.

When we fetch data from the queue using Queue. get (), it is important that the data is consumed normally.

If the data is not being consumed normally, Queue will assume that the task is still in progress, and if you use Queue. join (), you will be blocked, even if there are no messages in your queue.

So how to solve this problem of 1 straight blocking?

That is, after we normally consume the data, remember to call Queue.task_done () under 1, indicating that the task of queuing has ended.

When the task counter inside the queue is zero, the call to Queue. join () is no longer blocked.

To understand this process, refer to the custom thread pool example in http://python. iswbm. com/en/latest/c02/c02_06. html.

4 Summary 1

After learning the above three communication methods, we can easily find out Event And Condition It is a module provided by threading module, which is simple in principle and single in function. It can send True And Threading.Event0 Therefore, it can only be applied to some simple scenarios.

And Queue Is a more advanced module that can send any type of message, including strings, dictionaries, and so on. Its internal implementation actually references Condition Modules such as put And get Function, which is the blocking of its Condition The function is expanded, so the function is richer and can meet the practical application.

The above is Python concurrent programming thread message communication mechanism details, more information about Python concurrent thread message communication mechanism please pay attention to other related articles on this site!


Related articles: