Implement a simple thread pool with Python

  • 2020-05-05 11:29:21
  • OfStack

What is the concept of a thread pool?

In object-oriented programming, creating and destroying objects is time-consuming because creating an object requires memory resources or more. This is especially true in Java, where the virtual machine tries to keep track of each object so that it can be garbage collected after the object is destroyed. Therefore, one way to improve the efficiency of the service program is to minimize the number of objects created and destroyed, especially those that consume a lot of resources. How to use existing objects to serve is one of the key issues that needs to be addressed, which is why some "pooling of resources" techniques arise.

I understand that a thread pool is a unit that holds many threads, with a corresponding task queue. The whole execution process is to use the limited number of threads in the thread pool to complete the tasks in the task queue. The nice thing about this is that you don't need to create a thread for each task, because by the time you create the 100th thread to execute the 100th task, you probably have 50 threads ahead of you. Therefore, threads are reused to perform tasks, reducing the overhead of system resources.

A bad analogy is that there are 100 computer cases that need to be moved from the first floor to the second floor, and you don't have to call in 100 people to help you move them, you just need to call in 10 or 20 people, and each person is assigned 10 or 5 or even whoever moves faster moves more until the completion is unknown. (the metaphor is like...)

Anyway, I understand the concept of thread pools in general. So how do you implement it with python?

The code is
 


# !/usr/bin/env python
# -*- coding:utf-8 -*-
# ref_blog:http://www.open-open.com/home/space-5679-do-blog-id-3247.html
import Queue
import threading
import time
class WorkManager(object):
  def __init__(self, work_num=1000,thread_num=2):
    self.work_queue = Queue.Queue()
    self.threads = []
    self.__init_work_queue(work_num)
    self.__init_thread_pool(thread_num)
  """
     Initialization thread 
  """
  def __init_thread_pool(self,thread_num):
    for i in range(thread_num):
      self.threads.append(Work(self.work_queue))
  """
     Initialize the work queue 
  """
  def __init_work_queue(self, jobs_num):
    for i in range(jobs_num):
      self.add_job(do_job, i)
  """
     Add a job to the team 
  """
  def add_job(self, func, *args):
    self.work_queue.put((func, list(args)))# Task enlistment, Queue Internal implementation of the synchronization mechanism 
  """
     Check the remaining queue tasks 
  """
  def check_queue(self):
    return self.work_queue.qsize()
  """
     Wait for all threads to finish running 
  """ 
  def wait_allcomplete(self):
    for item in self.threads:
      if item.isAlive():item.join()
class Work(threading.Thread):
  def __init__(self, work_queue):
    threading.Thread.__init__(self)
    self.work_queue = work_queue
    self.start()
  def run(self):
    # An infinite loop that causes the created thread to close and exit under certain conditions 
    while True:
      try:
        do, args = self.work_queue.get(block=False)# Task asynchronous out of the team, Queue Internal implementation of the synchronization mechanism 
        do(args)
        self.work_queue.task_done()# Notifies the system that the task is complete 
      except Exception,e:
        print str(e)
        break
# Specific tasks to do 
def do_job(args):
  print args
  time.sleep(0.1)# Analog processing time 
  print threading.current_thread(), list(args)
if __name__ == '__main__':
  start = time.time()
  work_manager = WorkManager(10, 2)# or work_manager = WorkManager(10000, 20)
  work_manager.wait_allcomplete()
  end = time.time()
  print "cost all time: %s" % (end-start)

The code is straightforward.

There are only two classes in the entire code: WorkManager and Work. The former is indeed named as a manager, managing the thread pool and task queue, while the latter is a specific thread.

Its entire running logic is to assign WorkManager a specified number of tasks and threads, and then each thread fetches a task from the task queue to execute until there are no tasks in the queue. There is also a synchronization mechanism within Queue (which has not yet been studied).

To summarize the purpose of such a thread pool, this thing would never work for my original purpose, because I need to control the start and stop of threads on the web page, and this thread pool seems to be only used for concurrent tasks. But I think that while it doesn't work for thread control, it does work well for concurrent execution, perhaps in the crawling part of the web.


Related articles: