Thread pool instance code implemented with python
- 2020-06-23 01:00:16
- OfStack
The python3 standard library comes with its own thread pool ThreadPoolExecutor and process pool ProcessPoolExecutor.
If you're using python2, you can download a module called threadpool, which is the thread pool. For process pooling, multiprocessing.Pool comes with python.
You can also write your own threadpool.
# coding:utf-8
import Queue
import threading
import sys
import time
import math
class WorkThread(threading.Thread):
def __init__(self, task_queue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.task_queue = task_queue
self.start()
self.idle = True
def run(self):
sleep_time = 0.01 # The first 1 Take a break when you have no task to do 10 ms
multiply = 0
while True:
try:
# Take from the queue 1 A task
func, args, kwargs = self.task_queue.get(block=False)
self.idle = False
multiply = 0
# Implementation of the
func(*args, **kwargs)
except Queue.Empty:
time.sleep(sleep_time * math.pow(2, multiply))
self.idle = True
multiply += 1
continue
except:
print sys.exc_info()
raise
class ThreadPool:
def __init__(self, thread_num=10, max_queue_len=1000):
self.max_queue_len = max_queue_len
self.task_queue = Queue.Queue(max_queue_len) # Task waiting queue
self.threads = []
self.__create_pool(thread_num)
def __create_pool(self, thread_num):
for i in xrange(thread_num):
thread = WorkThread(self.task_queue)
self.threads.append(thread)
def add_task(self, func, *args, **kwargs):
''' add 1 Returns the length of the task wait queue
Call last before calling the method isSafe() judge 1 The next waiting task is not too many in case the submitted task is rejected
'''
try:
self.task_queue.put((func, args, kwargs))
except Queue.Full:
raise # An exception is thrown when the queue is full
return self.task_queue.qsize()
def isSafe(self):
''' The number of missions waiting is far from the police line
'''
return self.task_queue.qsize() < 0.9 * self.max_queue_len
def wait_for_complete(self):
''' All tasks waiting to be committed to the thread pool execute
'''
# First, the task wait queue becomes empty
while not self.task_queue.empty():
time.sleep(1)
# Second, so the compute thread has to become idle state
while True:
all_idle = True
for th in self.threads:
if not th.idle:
all_idle = False
break
if all_idle:
break
else:
time.sleep(1)
if __name__ == '__main__':
def foo(a, b):
print a + b
time.sleep(0.01)
thread_pool = ThreadPool(10, 100)
''' in Windows Failed the last test, Windows on Queue.Queue It's not thread safe '''
size = 0
for i in xrange(10000):
try:
size = thread_pool.add_task(foo, i, 2 * i)
except Queue.Full:
print 'queue full, queue size is ', size
time.sleep(2)
conclusion
That's it for this article about the thread pool instance code implemented with python, and I hope you found it helpful. Interested friends can continue to refer to other related topics in this site, if there is any deficiency, welcome to comment out. Thank you for your support!