Detailed explanation of message communication mechanism of Python concurrent programming thread
- 2021-12-11 08:09:41
- OfStack
I have already introduced how to use create threads and start threads. I believe everyone will have such an idea that threads are nothing more than creating 1, and then
start()
Next, it's too simple.
However, we should know that in a real project, the examples we give in the actual scenario are much more complicated, and the execution of different threads may be sequential, or their execution may be conditional and controlled. If we only rely on the shallow knowledge we have learned before, it is far from enough.
Today, let's discuss how to control the trigger execution of threads under 1.
To realize the control of multiple threads, in fact, the message communication mechanism is in effect, which uses this mechanism to send instructions to tell threads when they can execute, when they can't execute and what to execute.
After my summary, there are roughly three communication methods in threads:
threading.Event
threading.Condition
queue.Queue
Next, let's discuss 11.
1 Event Event
Python provides a very simple communication mechanism
Threading.Event
General condition variables. Multiple threads can wait for an event to occur, after which all threads will be activated.
The use of Event is also super simple, just three functions
event = threading.Event()
# Reset event So that all of the event Events are on standby
event.clear()
# Waiting to receive event Determines whether to block program execution
event.wait()
# Send event Directive, causing all settings for the event Thread execution of the event
event.set()
Let's take an example.
import time
import threading
class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event
def run(self):
print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
# Wait event.set() After, you can proceed
self.event.wait()
print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))
threads = []
event = threading.Event()
# Definition 5 Threads
[threads.append(MyThread(str(i), event)) for i in range(1,5)]
# Reset event , so that event.wait() Play a blocking role
event.clear()
# Start all threads
[t.start() for t in threads]
print(' Wait 5s...')
time.sleep(5)
print(' Wake up all threads ...')
event.set()
Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018
Wait 5s...
Wake up all threads ...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018
It can be seen that when all threads start (
start()
), it is not finished, but it is all in
self.event.wait()
It's stopped. We need to pass
event.set()
To send execution instructions to all threads before executing.
2 Condition
Condition and Event are similar and not much different.
Similarly, Condition only needs to master a few functions.
cond = threading.Condition()
# Similar lock.acquire()
cond.acquire()
# Similar lock.release()
cond.release()
# Waiting for the specified trigger will release the acquisition of the lock at the same time , Until it was notify To regain possession.
cond.wait()
# Send designation, trigger execution
cond.notify()
Let's take a look at an interesting example of hide-and-seek on the Internet
import threading, time
class Hider(threading.Thread):
def __init__(self, cond, name):
super(Hider, self).__init__()
self.cond = cond
self.name = name
def run(self):
time.sleep(1) # Make sure to run first Seeker Methods in
self.cond.acquire()
print(self.name + ': I have blindfolded my eyes ')
self.cond.notify()
self.cond.wait()
print(self.name + ': I found you oh ~_~')
self.cond.notify()
self.cond.release()
print(self.name + ': I won ')
class Seeker(threading.Thread):
def __init__(self, cond, name):
super(Seeker, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait()
print(self.name + ': I've already hidden it. Come and find me quickly ')
self.cond.notify()
self.cond.wait()
self.cond.release()
print(self.name + ': You found it, hey ~~~')
cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()
Communicate through cond, block yourself and make the other party execute. Thus, sequential execution is achieved.
Look at the results
hider: I have blindfolded my eyes
seeker: I've already hidden it. Come and find me quickly
hider: I found you ~_~
hider: I won
seeker: You found it, hey ~~~
3 Queue queue
The last one, queues, is the focus of this section, because it is the most frequently used in our daily development.
Perhaps the safest way to send data from one thread to another is to use queues in the queue library. Creates an Queue object shared by multiple threads that use the
put()
And
get()
Operation to send and get elements to the queue.
Similarly, for Queue, we only need to master a few functions.
from queue import Queue
# maxsize Default to 0 , unrestricted
# 1 Dan >0 And the number of messages reaches the limit, q.put() Will also block
q = Queue(maxsize=0)
# Default blocking program, waiting for queue messages, and setting timeout time
q.get(block=True, timeout=None)
# Send a message: By default, the program will be blocked until there is a free place in the queue to put data
q.put(item, block=True, timeout=None)
# Waiting for all messages to be consumed
q.join()
# Notifies the queue that task processing has completed, and when all tasks are processed, join() The blocking will be lifted
q.task_done()
The following 3 methods, just know, 1 generally do not need to use
# Query the number of messages in the current queue
q.qsize()
# Whether all queue messages have been consumed and returned True/False
q.empty()
# Check whether the message in the queue is full
q.full()
The function will be 1 more than the previous one, and it also shows that its functions are richer from another aspect.
Let me give you an example of a teacher calling the roll.
# coding=utf-8
# /usr/bin/env python
'''
Author: wangbm
Email: wongbingming@163.com
Wechat: mrbensonwon
Blog: python-online.cn
WeChat official account: Python Programming time
date: 2020/9/20 Afternoon 7:30
desc:
'''
__author__ = 'wangbm'
from queue import Queue
from threading import Thread
import time
class Student:
def __init__(self, name):
self.name = name
def speak(self):
print("{} : Here! ".format(self.name))
class Teacher:
def __init__(self, queue):
super().__init__()
self.queue=queue
def call(self, student_name):
if student_name == "exit":
print(" The roll call is over and the class begins ..")
else:
print(" Teacher: {} Are you here yet? ".format(student_name))
# Send a message and ask whose name
self.queue.put(student_name)
class CallManager(Thread):
def __init__(self, queue):
super().__init__()
self.students = {}
self.queue = queue
def put(self, student):
self.students.setdefault(student.name, student)
def run(self):
while True:
# Blocking programs, listening to teachers at all times, and receiving messages
student_name = queue.get()
if student_name == "exit":
break
elif student_name in self.students:
self.students[student_name].speak()
else:
print(" Teacher, our class, no {} This man ".format(student_name))
queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name=" Xiao Ming ")
s2 = Student(name=" Xiaoliang ")
cm = CallManager(queue)
cm.put(s1)
cm.put(s2)
cm.start()
print(' Begin roll call ~')
teacher.call(' Xiao Ming ')
time.sleep(1)
teacher.call(' Xiaoliang ')
time.sleep(1)
teacher.call("exit")
The results are as follows
Start roll call ~
Teacher: Is Xiao Ming here yet?
Xiaoming: Here!
Teacher: Is Xiaoliang here yet?
Xiaoliang: Here!
The roll call is over and the class begins.
In fact, there is another very important method of queue, Queue.task_done ()
If we don't understand its principle, we are writing a program, and we are likely to get stuck.
When we fetch data from the queue using Queue. get (), it is important that the data is consumed normally.
If the data is not being consumed normally, Queue will assume that the task is still in progress, and if you use Queue. join (), you will be blocked, even if there are no messages in your queue.
So how to solve this problem of 1 straight blocking?
That is, after we normally consume the data, remember to call Queue.task_done () under 1, indicating that the task of queuing has ended.
When the task counter inside the queue is zero, the call to Queue. join () is no longer blocked.
To understand this process, refer to the custom thread pool example in http://python. iswbm. com/en/latest/c02/c02_06. html.
4 Summary 1
After learning the above three communication methods, we can easily find out
Event
And
Condition
It is a module provided by threading module, which is simple in principle and single in function. It can send
True
And
Threading.Event
0
Therefore, it can only be applied to some simple scenarios.
And
Queue
Is a more advanced module that can send any type of message, including strings, dictionaries, and so on. Its internal implementation actually references
Condition
Modules such as
put
And
get
Function, which is the blocking of its
Condition
The function is expanded, so the function is richer and can meet the practical application.
The above is Python concurrent programming thread message communication mechanism details, more information about Python concurrent thread message communication mechanism please pay attention to other related articles on this site!