C++11 simple method to implement thread pool

  • 2020-11-20 06:12:16
  • OfStack

What is a thread pool

A thread pool is a form of multithreaded processing that adds tasks to a queue and then starts them automatically after the thread is created. Thread pool threads are background threads. Each thread USES the default stack size, runs at the default priority, and is in a multithreaded unit. If a thread is idle in managed code (such as waiting for an event), the thread pool inserts another worker thread to keep all processors busy. If all the thread pool threads remain busy at all times, but the queue contains pending work, the thread pool will create another worker thread after a period of time but the number of threads will never exceed the maximum. Threads exceeding the maximum can queue, but they do not start until the other threads have completed.

What are the disadvantages of not using thread pools

If you create too many threads, you will waste a certain amount of resources, and some threads will be underused.
Destroying too many threads will cause you to waste time creating them again later.
Creating threads that are too slow will result in long waits and poor performance.
The destroy thread is too slow and causes other thread resources to starve.

Components of a thread pool

1. Thread pool Manager (ThreadPoolManager) : Used to create and manage thread pools
2. Worker thread (WorkThread) : The thread in the thread pool
3. Task interface (Task) : The interface that each task must implement for the worker thread to schedule the execution of the task.
4. Task queue: Used to store tasks that have not been processed. Provides a buffering mechanism

Now let's look directly at the code implementation

ThreadPoolManage.hpp


#pragma once
#include <thread>
#include <vector>
#include <queue>
#include <condition_variable>
#include <mutex>
/*
*  abstract 1 A task   Expand according to your own needs 
*/
class AbsTask 
{
public:
 AbsTask() = default;
 virtual ~AbsTask() = default;
public:
 virtual void run() = 0;
};
template<class _Ty>
class ThreadPoolManage
{
public:
 ThreadPoolManage(unsigned int nMaxThread)
 :mMaxThreadNum(nMaxThread)
 , mThreadStatus(false)
 {
 // Threads are created at startup 
 auto maxNum = std::thread::hardware_concurrency();
 // Gets the current operating system CPU Number of cores   Set according to the number of cores   Maximum number of concurrent worker threads 
 mMaxThreadNum = mMaxThreadNum > maxNum ? maxNum : mMaxThreadNum;
 // Create a worker thread pool 
 for (auto i = 0; i < mMaxThreadNum; i++)
 {
  mWorkers.emplace_back([this] {
  while (true)
  {
   std::unique_lock<std::mutex> lock(this->mQueue_mutex);
   this->mCondition.wait(lock, [this]() {return this->mThreadStatus || !this->mTasks.empty(); });
   if (this->mThreadStatus && this->mTasks.empty()) 
   { 
   return; 
   }
   // Get the task for the queue header 
   auto task = std::move(this->mTasks.front());
   // Tasks out of the team 
   this->mTasks.pop();
   // Perform the work 
   task.run();
  }
  });
 }
 }
 ~ThreadPoolManage()
 {

 {
  std::unique_lock<std::mutex> lock(this->mQueue_mutex);
  this->mThreadStatus = true;
 }
 // Tell all threads to get to work   Then quit 
 this->mCondition.notify_all();
 // Wait for all threads to finish working 
 for (std::thread& worker : this->mWorkers) 
 {
  if (worker.joinable()) {
  worker.join();
  }
 }
 }
 /*
 *  Add tasks to the task queue 
 */
 void addTask(_Ty& task)
 {
 std::unique_lock<std::mutex> lock(this->mQueue_mutex);
 if (mThreadStatus) {
  throw std::runtime_error("workers stop");
 }
 mTasks.emplace(std::move(task));
 mCondition.notify_one();
 }
private:
 /*
 *  Worker thread pool 
 */
 std::vector<std::thread> mWorkers;
 /*
 *  Task queue 
 */
 std::queue<_Ty> mTasks;
 /*
  Maximum number of concurrent worker threads 
 */
 unsigned int mMaxThreadNum;
 /*
  Condition variables,   Controls the working state of threads in the thread pool 
 */
 std::condition_variable mCondition;
 /*
 *  Worker thread lock 
 */
 std::mutex mQueue_mutex;
 /*
 *  Switches to control threads  false  Continue to work  true  Out of the thread 
 */
 bool mThreadStatus;

};

The calling code

main.cpp


#include <iostream>
#include <chrono>
#include "ThreadPool.hpp"
class Task :public AbsTask
{
public:
 void run() override
 {
 std::this_thread::sleep_for(std::chrono::seconds(1));
 std::cout << "works ...... " << std::this_thread::get_id() << std::endl;
 }
};
int main()
{
 ThreadPoolManage<Task> ThreadPool(8);
 for (size_t i = 0; i < 256; i++)
 {
 Task task;
 ThreadPool.addTask(task);
 }
 std::cin.get();
 system("pause");
}

Related articles: