Summary of knowledge points of python thread priority queue

  • 2021-09-11 20:53:51
  • OfStack

Synchronized, thread-safe queue classes are provided in the Queue module of Python, including FIFO (first in first out) queue Queue, LIFO (last in first out) queue LifoQueue, and priority queue PriorityQueue.

1. Description

These queues all implement lock primitives, which can be used directly in multithreading, and queues can be used to synchronize between threads.

The common methods in the module are as follows:

Queue. qsize () Returns the size of the queue Queue. empty () returns True if the queue is empty, otherwise False Queue. full () If the queue is full, return True, otherwise False Queue. full corresponds to maxsize Queue. get ([block [, timeout]]) Get queue, timeout wait time Queue. get_nowait () Equivalent to Queue. get (False) Queue. put (item) write queue, timeout wait time Queue. put_nowait (item) Equivalent to Queue. put (item, False) Queue.task_done () After completing a task, the Queue.task_done () function sends a signal to the queue where the task has been completed Queue. join () actually means waiting until the queue is empty before doing anything else

2. Examples


#!/usr/bin/python3
import queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
  def __init__(self, threadID, name, q):
    threading.Thread.__init__(self)
    self.threadID = threadID
    self.name = name
    self.q = q
  def run(self):
    print (" Open thread: " + self.name)
    process_data(self.name, self.q)
    print (" Exit thread: " + self.name)
def process_data(threadName, q):
  while not exitFlag:
    queueLock.acquire()
    if not workQueue.empty():
      data = q.get()
      queueLock.release()
      print ("%s processing %s" % (threadName, data))
    else:
      queueLock.release()
    time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1
#  Create a new thread 
for tName in threadList:
  thread = myThread(threadID, tName, workQueue)
  thread.start()
  threads.append(thread)
  threadID += 1
#  Fill the queue 
queueLock.acquire()
for word in nameList:
  workQueue.put(word)
queueLock.release()
#  Wait for the queue to empty 
while not workQueue.empty():
  pass
#  Notify the thread that it is time to exit 
exitFlag = 1
#  Wait for all threads to complete 
for t in threads:
  t.join()
print (" Exit the main thread ")

Extension of knowledge points:

Problem

How to implement a queue sorted by priority? And every pop operation on this queue always returns the element with the highest priority

Solutions

The following class implements a simple priority queue using the heapq module:


import heapq

class PriorityQueue:
 def __init__(self):
 self._queue = []
 self._index = 0

 def push(self, item, priority):
 heapq.heappush(self._queue, (-priority, self._index, item))
 self._index += 1

 def pop(self):
 return heapq.heappop(self._queue)[-1]

Here's how to use it:


>>> class Item:
... def __init__(self, name):
...  self.name = name
... def __repr__(self):
...  return 'Item({!r})'.format(self.name)
...
>>> q = PriorityQueue()
>>> q.push(Item('foo'), 1)
>>> q.push(Item('bar'), 5)
>>> q.push(Item('spam'), 4)
>>> q.push(Item('grok'), 1)
>>> q.pop()
Item('bar')
>>> q.pop()
Item('spam')
>>> q.pop()
Item('foo')
>>> q.pop()
Item('grok')
>>>

Related articles: