Python multi threaded synchronization in four ways

  • 2020-06-01 10:05:44
  • OfStack

Critical resources are those resources that can only be accessed by one thread at a time. A typical example is a printer, which can only be used by one program at a time to perform the printing function, because multiple threads cannot operate at the same time. The code that accesses this part of resources is usually called a critical section.

Locking mechanism

The Lock class of threading is locked with the acquire function of this class and unlocked with the realease function


import threading
import time

class Num:
  def __init__(self):
    self.num = 0
    self.lock = threading.Lock()
  def add(self):
    self.lock.acquire()# Locks, locking the corresponding resource 
    self.num += 1
    num = self.num
    self.lock.release()# Unlocks and leaves the resource 
    return num

n = Num()
class jdThread(threading.Thread):
  def __init__(self,item):
    threading.Thread.__init__(self)
    self.item = item
  def run(self):
    time.sleep(2)
    value = n.add()# will num add 1 And output the original data and +1 Subsequent data 
    print(self.item,value)

for item in range(5):
  t = jdThread(item)
  t.start()
  t.join()# Make the thread 1 a 1 A perform 

When a thread calls the lock's acquire() method to acquire the lock, the lock enters the "locked" state. Only one thread at a time can acquire the lock. If at this point another thread tries to acquire the lock, the thread becomes in an "blocked" state, known as "synchronous blocking" (see basic concepts of multithreading).

The lock enters the "unlocked" state until the thread that owns it calls the lock's release() method to release it. The thread scheduler selects one of the threads in the synchronously blocked state to acquire the lock and put the thread into the run (running) state.

A semaphore

The semaphore also provides the acquire method and the release method. Whenever the acquire method is called, subtract 1 from the internal counter if it is greater than 0. If the internal counter is equal to 0, the thread is blocked until a thread calls the release method to update the internal counter to a value greater than 1.


import threading
import time
class Num:
  def __init__(self):
    self.num = 0
    self.sem = threading.Semaphore(value = 3)
    # allowed 3 Two threads access the resource at the same time 

  def add(self):
    self.sem.acquire()# Internal counter decrement 1
    self.num += 1
    num = self.num
    self.sem.release()# Internal counter add 1
    return num

n = Num()
class jdThread(threading.Thread):
  def __init__(self,item):
    threading.Thread.__init__(self)
    self.item = item
  def run(self):
    time.sleep(2)
    value = n.add()
    print(self.item,value)

for item in range(100):
  t = jdThread(item)
  t.start()
  t.join()

conditional

A conditional variable is a mechanism that allows threads to access data only after certain conditions have been met.

It USES the Condition class to do this, and since it can also be used like the lock mechanism, it also has the acquire and release methods, and it also has the wait, notify, notifyAll methods.


"""
1 A simple production consumer model, through the control of conditional variables of the number of products to increase or decrease, call 1 The secondary producer product is +1 , the call 1 Sub-consumer products will -1.
"""

"""
 use  Condition  Class to do this, and since it can also be used like a lock mechanism, it has one  acquire  Methods and  release  Method, and it still has 
wait .  notify .  notifyAll  Methods. 
"""

import threading
import queue,time,random

class Goods:# Product class 
  def __init__(self):
    self.count = 0
  def add(self,num = 1):
    self.count += num
  def sub(self):
    if self.count>=0:
      self.count -= 1
  def empty(self):
    return self.count <= 0

class Producer(threading.Thread):# The producer classes 
  def __init__(self,condition,goods,sleeptime = 1):#sleeptime=1
    threading.Thread.__init__(self)
    self.cond = condition
    self.goods = goods
    self.sleeptime = sleeptime
  def run(self):
    cond = self.cond
    goods = self.goods
    while True:
      cond.acquire()# Lock the resources 
      goods.add()
      print(" Product quantity :",goods.count," Producer thread ")
      cond.notifyAll()# Wake up all waiting threads -- "Is really about awakening the consumer process 
      cond.release()# Unlock the resource 
      time.sleep(self.sleeptime)

class Consumer(threading.Thread):# Consumer class 
  def __init__(self,condition,goods,sleeptime = 2):#sleeptime=2
    threading.Thread.__init__(self)
    self.cond = condition
    self.goods = goods
    self.sleeptime = sleeptime
  def run(self):
    cond = self.cond
    goods = self.goods
    while True:
      time.sleep(self.sleeptime)
      cond.acquire()# Lock the resources 
      while goods.empty():# If there is no product, make the thread wait 
        cond.wait()
      goods.sub()
      print(" Product quantity :",goods.count," Consumer thread ")
      cond.release()# Unlock the resource 

g = Goods()
c = threading.Condition()

pro = Producer(c,g)
pro.start()

con = Consumer(c,g)
con.start()

Synchronous queue

put method and task_done method, queue has 1 number of unfinished tasks, num, put, num+1, task, num-1. The task ends when all the tasks are completed.


import threading
import queue
import time
import random

'''
1. create 1 a  Queue.Queue()  , and then populates it with the data. 
2. The populated instance is passed to the thread class through inheritance  threading.Thread  The way to create. 
3. Take it out of the queue each time 1 And use the data and data in this thread  run  Method to perform the corresponding work. 
4. After this is done, use  queue.task_done()  The function sends to a queue where the task has completed 1 A signal. 
5. Queue execution  join  Operation, which essentially means waiting until the queue is empty before exiting the main program. 
'''

class jdThread(threading.Thread):
  def __init__(self,index,queue):
    threading.Thread.__init__(self)
    self.index = index
    self.queue = queue

  def run(self):
    while True:
      time.sleep(1)
      item = self.queue.get()
      if item is None:
        break
      print(" Serial number: ",self.index," task ",item," complete ")
      self.queue.task_done()#task_done Method makes the number of unfinished tasks -1

q = queue.Queue(0)
'''
 The initialization function accepts 1 Two Numbers for the capacity of the queue, if passed 
1 Delta is less than or equal to 0 By default, the capacity of the queue is assumed to be infinite .
'''
for i in range(2):
  jdThread(i,q).start()# Both threads complete the task simultaneously 

for i in range(10):
  q.put(i)#put Method makes the number of unfinished tasks +1


Related articles: