Thread pool instance code implemented with python

  • 2020-06-23 01:00:16
  • OfStack

The python3 standard library comes with its own thread pool ThreadPoolExecutor and process pool ProcessPoolExecutor.

If you're using python2, you can download a module called threadpool, which is the thread pool. For process pooling, multiprocessing.Pool comes with python.

You can also write your own threadpool.


# coding:utf-8
 
import Queue
import threading
import sys
import time
import math
 
 
class WorkThread(threading.Thread):
 
  def __init__(self, task_queue):
    threading.Thread.__init__(self)
    self.setDaemon(True)
    self.task_queue = task_queue
    self.start()
    self.idle = True
 
  def run(self):
    sleep_time = 0.01 #  The first 1 Take a break when you have no task to do 10 ms 
    multiply = 0
    while True:
      try:
        #  Take from the queue 1 A task 
        func, args, kwargs = self.task_queue.get(block=False)
        self.idle = False
        multiply = 0
        #  Implementation of the 
        func(*args, **kwargs)
      except Queue.Empty:
        time.sleep(sleep_time * math.pow(2, multiply))
        self.idle = True
        multiply += 1
        continue
      except:
        print sys.exc_info()
        raise
 
 
class ThreadPool:
 
  def __init__(self, thread_num=10, max_queue_len=1000):
    self.max_queue_len = max_queue_len
    self.task_queue = Queue.Queue(max_queue_len) #  Task waiting queue 
    self.threads = []
    self.__create_pool(thread_num)
 
  def __create_pool(self, thread_num):
    for i in xrange(thread_num):
      thread = WorkThread(self.task_queue)
      self.threads.append(thread)
 
  def add_task(self, func, *args, **kwargs):
    ''' add 1 Returns the length of the task wait queue 
       Call last before calling the method isSafe() judge 1 The next waiting task is not too many in case the submitted task is rejected 
    '''
    try:
      self.task_queue.put((func, args, kwargs))
    except Queue.Full:
      raise #  An exception is thrown when the queue is full 
    return self.task_queue.qsize()
 
  def isSafe(self):
    ''' The number of missions waiting is far from the police line 
    '''
    return self.task_queue.qsize() < 0.9 * self.max_queue_len
 
  def wait_for_complete(self):
    ''' All tasks waiting to be committed to the thread pool execute 
    '''
    # First, the task wait queue becomes empty 
    while not self.task_queue.empty():
      time.sleep(1)
    #  Second, so the compute thread has to become idle state 
    while True:
      all_idle = True
      for th in self.threads:
        if not th.idle:
          all_idle = False
          break
      if all_idle:
        break
      else:
        time.sleep(1)
 
 
if __name__ == '__main__':
  def foo(a, b):
    print a + b
    time.sleep(0.01)
  thread_pool = ThreadPool(10, 100)
  ''' in Windows Failed the last test, Windows on Queue.Queue It's not thread safe '''
  size = 0
  for i in xrange(10000):
    try:
      size = thread_pool.add_task(foo, i, 2 * i)
    except Queue.Full:
      print 'queue full, queue size is ', size
  time.sleep(2)

conclusion

That's it for this article about the thread pool instance code implemented with python, and I hope you found it helpful. Interested friends can continue to refer to other related topics in this site, if there is any deficiency, welcome to comment out. Thank you for your support!


Related articles: