C++11 simple method to implement thread pool
- 2020-11-20 06:12:16
- OfStack
What is a thread pool
A thread pool is a form of multithreaded processing that adds tasks to a queue and then starts them automatically after the thread is created. Thread pool threads are background threads. Each thread USES the default stack size, runs at the default priority, and is in a multithreaded unit. If a thread is idle in managed code (such as waiting for an event), the thread pool inserts another worker thread to keep all processors busy. If all the thread pool threads remain busy at all times, but the queue contains pending work, the thread pool will create another worker thread after a period of time but the number of threads will never exceed the maximum. Threads exceeding the maximum can queue, but they do not start until the other threads have completed.
What are the disadvantages of not using thread pools
If you create too many threads, you will waste a certain amount of resources, and some threads will be underused.
Destroying too many threads will cause you to waste time creating them again later.
Creating threads that are too slow will result in long waits and poor performance.
The destroy thread is too slow and causes other thread resources to starve.
Components of a thread pool
1. Thread pool Manager (ThreadPoolManager) : Used to create and manage thread pools
2. Worker thread (WorkThread) : The thread in the thread pool
3. Task interface (Task) : The interface that each task must implement for the worker thread to schedule the execution of the task.
4. Task queue: Used to store tasks that have not been processed. Provides a buffering mechanism
Now let's look directly at the code implementation
ThreadPoolManage.hpp
#pragma once
#include <thread>
#include <vector>
#include <queue>
#include <condition_variable>
#include <mutex>
/*
* abstract 1 A task Expand according to your own needs
*/
class AbsTask
{
public:
AbsTask() = default;
virtual ~AbsTask() = default;
public:
virtual void run() = 0;
};
template<class _Ty>
class ThreadPoolManage
{
public:
ThreadPoolManage(unsigned int nMaxThread)
:mMaxThreadNum(nMaxThread)
, mThreadStatus(false)
{
// Threads are created at startup
auto maxNum = std::thread::hardware_concurrency();
// Gets the current operating system CPU Number of cores Set according to the number of cores Maximum number of concurrent worker threads
mMaxThreadNum = mMaxThreadNum > maxNum ? maxNum : mMaxThreadNum;
// Create a worker thread pool
for (auto i = 0; i < mMaxThreadNum; i++)
{
mWorkers.emplace_back([this] {
while (true)
{
std::unique_lock<std::mutex> lock(this->mQueue_mutex);
this->mCondition.wait(lock, [this]() {return this->mThreadStatus || !this->mTasks.empty(); });
if (this->mThreadStatus && this->mTasks.empty())
{
return;
}
// Get the task for the queue header
auto task = std::move(this->mTasks.front());
// Tasks out of the team
this->mTasks.pop();
// Perform the work
task.run();
}
});
}
}
~ThreadPoolManage()
{
{
std::unique_lock<std::mutex> lock(this->mQueue_mutex);
this->mThreadStatus = true;
}
// Tell all threads to get to work Then quit
this->mCondition.notify_all();
// Wait for all threads to finish working
for (std::thread& worker : this->mWorkers)
{
if (worker.joinable()) {
worker.join();
}
}
}
/*
* Add tasks to the task queue
*/
void addTask(_Ty& task)
{
std::unique_lock<std::mutex> lock(this->mQueue_mutex);
if (mThreadStatus) {
throw std::runtime_error("workers stop");
}
mTasks.emplace(std::move(task));
mCondition.notify_one();
}
private:
/*
* Worker thread pool
*/
std::vector<std::thread> mWorkers;
/*
* Task queue
*/
std::queue<_Ty> mTasks;
/*
Maximum number of concurrent worker threads
*/
unsigned int mMaxThreadNum;
/*
Condition variables, Controls the working state of threads in the thread pool
*/
std::condition_variable mCondition;
/*
* Worker thread lock
*/
std::mutex mQueue_mutex;
/*
* Switches to control threads false Continue to work true Out of the thread
*/
bool mThreadStatus;
};
The calling code
main.cpp
#include <iostream>
#include <chrono>
#include "ThreadPool.hpp"
class Task :public AbsTask
{
public:
void run() override
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "works ...... " << std::this_thread::get_id() << std::endl;
}
};
int main()
{
ThreadPoolManage<Task> ThreadPool(8);
for (size_t i = 0; i < 256; i++)
{
Task task;
ThreadPool.addTask(task);
}
std::cin.get();
system("pause");
}