C++11 based threadpool thread pool of is concise and can take any number of arguments

  • 2020-06-15 09:59:40
  • OfStack

C++11 joins the thread library, putting an end to the history that the standard library did not support concurrency. However, c++ for multithreading support is still relatively low-level, slightly more advanced use of the need to implement their own, such as thread pools, semaphores, etc. This stuff, thread pool, is asked many times in interviews, and the one-like answer is, "Manage a task queue, a thread queue, and then take a task and assign it to a thread to do it, and so on." Seems to be no problem. But when it comes to writing programs, something goes wrong.

Needless to say, first on the realization, and then wager. (dont talk, show me ur code!)

Code implementation


#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

namespace std
{
#define MAX_THREAD_NUM 256

// The thread pool , Anonymous function executions of variable argument functions or Ramda expressions can be committed , You can get the execution return value 
// Class member functions are not supported ,  Class static member or global functions are supported ,Opteron() Function etc. 
class threadpool
{
  using Task = std::function<void()>;
  //  The thread pool 
  std::vector<std::thread> pool;
  //  Task queue 
  std::queue<Task> tasks;
  //  synchronous 
  std::mutex m_lock;
  //  Conditions for block 
  std::condition_variable cv_task;
  //  Whether to close the commit 
  std::atomic<bool> stoped;
  // Number of free threads 
  std::atomic<int> idlThrNum;

public:
  inline threadpool(unsigned short size = 4) :stoped{ false }
  {
    idlThrNum = size < 1 ? 1 : size;
    for (size = 0; size < idlThrNum; ++size)
    {  // Initializes the thread count 
      pool.emplace_back(
        [this]
        { //  Worker thread function 
          while(!this->stoped)
          {
            std::function<void()> task;
            {  //  To obtain 1 One to be executed  task
              std::unique_lock<std::mutex> lock{ this->m_lock };// unique_lock  Compared with the  lock_guard  The upside: Anytime  unlock()  and  lock()
              this->cv_task.wait(lock,
                [this] {
                  return this->stoped.load() || !this->tasks.empty();
                }
              ); // wait  Until there is  task
              if (this->stoped && this->tasks.empty())
                return;
              task = std::move(this->tasks.front()); //  take 1 a  task
              this->tasks.pop();
            }
            idlThrNum--;
            task();
            idlThrNum++;
          }
        }
      );
    }
  }
  inline ~threadpool()
  {
    stoped.store(true);
    cv_task.notify_all(); //  Wake up all threads for execution 
    for (std::thread& thread : pool) {
      //thread.detach(); //  Leave threads to their own devices 
      if(thread.joinable())
        thread.join(); //  Waiting for the task to end,   Premise: Thread 1 It will be done 
    }
  }

public:
  //  submit 1 A task 
  //  call .get() Getting the return value waits for the task to complete , Get the return value 
  //  There are two ways to call a class member, 
  // 1 Is using   bind :  .commit(std::bind(&Dog::sayHello, &dog));
  // 1 Kind is to use  mem_fn :  .commit(std::mem_fn(&Dog::sayHello), &dog)
  template<class F, class... Args>
  auto commit(F&& f, Args&&... args) ->std::future<decltype(f(args...))>
  {
    if (stoped.load())  // stop == true ??
      throw std::runtime_error("commit on ThreadPool is stopped.");

    using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type,  function  f  Type of the return value of 
    auto task = std::make_shared<std::packaged_task<RetType()> >(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...)
      );  // wtf !
    std::future<RetType> future = task->get_future();
    {  //  Add tasks to the queue 
      std::lock_guard<std::mutex> lock{ m_lock };// Locks the statement of the current block  lock_guard  is  mutex  the  stack  Encapsulates a class when it is constructed  lock() When we destruct  unlock()
      tasks.emplace(
        [task]()
        { // push(Task{...})
          (*task)();
        }
      );
    }
    cv_task.notify_one(); //  Wake up the 1 Thread execution 

    return future;
  }

  // Number of free threads 
  int idlCount() { return idlThrNum; }

};

}

#endif

Not a lot of code, hundreds of lines of code to complete the thread pool, and look at commit, ha, not fixed arguments, no limit on the number of arguments! This is thanks to variable parameter templates.

How to use ?

Take a look at the following code (expand it)


#include "threadpool.h"
#include <iostream>

void fun1(int slp)
{
  printf(" hello, fun1 ! %d\n" ,std::this_thread::get_id());
  if (slp>0) {
    printf(" ======= fun1 sleep %d ========= %d\n",slp, std::this_thread::get_id());
    std::this_thread::sleep_for(std::chrono::milliseconds(slp));
  }
}

struct gfun {
  int operator()(int n) {
    printf("%d hello, gfun ! %d\n" ,n, std::this_thread::get_id() );
    return 42;
  }
};

class A { 
public:
  static int Afun(int n = 0) {  // The function has to be  static  Can use thread pools directly 
    std::cout << n << " hello, Afun ! " << std::this_thread::get_id() << std::endl;
    return n;
  }

  static std::string Bfun(int n, std::string str, char c) {
    std::cout << n << " hello, Bfun ! "<< str.c_str() <<" " << (int)c <<" " << std::this_thread::get_id() << std::endl;
    return str;
  }
};

int main()
  try {
    std::threadpool executor{ 50 };
    A a;
    std::future<void> ff = executor.commit(fun1,0);
    std::future<int> fg = executor.commit(gfun{},0);
    std::future<int> gg = executor.commit(a.Afun, 9999); //IDE Prompt error , But you can compile and run it 
    std::future<std::string> gh = executor.commit(A::Bfun, 9998,"mult args", 123);
    std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id() << std::endl; return "hello,fh ret !"; });

    std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::microseconds(900));

    for (int i = 0; i < 50; i++) {
      executor.commit(fun1,i*100 );
    }
    std::cout << " ======= commit all ========= " << std::this_thread::get_id()<< " idlsize="<<executor.idlCount() << std::endl;

    std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(3));

    ff.get(); // call .get() Getting the return value waits for the thread to finish executing , Get the return value 
    std::cout << fg.get() << " " << fh.get().c_str()<< " " << std::this_thread::get_id() << std::endl;

    std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(3));

    std::cout << " ======= fun1,55 ========= " << std::this_thread::get_id() << std::endl;
    executor.commit(fun1,55).get();  // call .get() Getting the return value waits for the thread to finish executing 

    std::cout << "end... " << std::this_thread::get_id() << std::endl;


    std::threadpool pool(4);
    std::vector< std::future<int> > results;

    for (int i = 0; i < 8; ++i) {
      results.emplace_back(
        pool.commit([i] {
          std::cout << "hello " << i << std::endl;
          std::this_thread::sleep_for(std::chrono::seconds(1));
          std::cout << "world " << i << std::endl;
          return i*i;
        })
      );
    }
    std::cout << " ======= commit all2 ========= " << std::this_thread::get_id() << std::endl;

    for (auto && result : results)
      std::cout << result.get() << ' ';
    std::cout << std::endl;
    return 0;
  }
catch (std::exception& e) {
  std::cout << "some unhappy happened... " << std::this_thread::get_id() << e.what() << std::endl;
}

In order to avoid doubts, the copyright notice: the code is me "written", but the idea comes from Internet, especially this thread pool implementation (basic copy this implementation, plus the classmate's implementation and explanation, good things worth copy! Then the synthesis changed to be more concise).

Realize the principle of

Pick up the crap. "Manage one task queue, one thread queue, and then take one task at a time and assign it to one thread to do it, and so on." What's wrong with that line of thinking? Thread pool 1 generally requires reuse of threads, so if one task is allocated to one thread, and then reassigned after execution, it is basically unsupported at the language level: thread of 1 general language executes a fixed task function, and the thread ends when execution is completed (at least c++ is the case). How can so allocate task and thread?

Have each thread execute the scheduling function: loop to get one task and execute it.

Isn't the idea awesome! The thread function is guaranteed to be unique, and the multiplexed thread executes task.

Even if you understand idea, the code still needs to be explained in detail under 1.

1. One thread pool and one task queue queue should be ok;
2. Task queue is a typical producer-consumer model, which requires at least two tools: 1 mutex + 1 conditional variable, or 1 mutex + 1 semaphore. mutex is actually a lock to ensure the mutual exclusion of adding and removing tasks (fetch). A condition variable is to guarantee the synchronization of acquiring task: a queue of empty, the thread should wait (block);
3, atomic < bool > Their operation load()/store() is atomic, so there is no need to add mutex.

c++ Language details

Even understanding the principle does not mean that you can write a program, the above use of many c++11 "strange tricks", the following brief description.

using Task = function < void() > Is a type alias that simplifies the use of typedef. function < void() > Think of it as a function type that accepts any prototype as a function of void(), either as a function object, or as an anonymous function. void() means no arguments, no return value. pool. emplace_back ([this] {... }) and pool. push_back ([this] {... }) Function 1, but the former performance will be better; pool. emplace_back ([this] {... }) is to construct a thread object, the execution function is ramda anonymous function; All objects are initialized with {} instead of () because the style is less than 1 and error-prone; Anonymous function: [this]{... } Not more. [] is the catcher, this refers to the variable outside the field this pointer, internal use of infinite loop, from cv_ES106en.wait (lock,[this]{... }) to block the thread; delctype(expr) is used to infer the type of expr, which is similar to auto and ACTS as a type placeholder, occupying the position of 1 type. auto f(A a, B b) - > decltype(a+b), decltype(a+b), f(A a, B b), why? ! c++ that's how it works! commit method is not a little strange! You can take as many arguments as you want. The first argument is f, followed by the function f! (Note: to pass the parameter struct/class, it is recommended to use pointer, be careful of the scope of the variable) The variable parameter template is c++11 1 big bright spot, enough bright! As for why Arg... And arg... Because that's how the rules work! Direct use of commit can only call the stdcall function, but there are two ways to call class members. One is to use bind:.commit (std::bind( & Dog::sayHello, & dog)); mem_fn:.commit (std::mem_fn( & Dog::sayHello), & dog); make_shared is used to construct shared_ptr smart pointer. The usage is shared_ptr in general < int > p = make_shared < int > (4) and *p == 4. The nice thing about smart Pointers is that they automatically delete! bind function, accept f and some parameters, return currying after the anonymous function, such as bind(add, 4) can achieve add4 like functions! The forward() function, similar to the move() function, which is a rvaluated parameter, is... How do you say that? The reference type that does not change the original type passed in (left or left, right or right); packaged_task is the wrapper class of the task function. future is obtained through get_future, and the return value of the function (future.get ()) is obtained through future. packaged_task itself can be called () like function 1; queue is the queue class, front() gets the header element, pop() removes the header element; back() gets the tail element and push() adds the tail element; lock_guard is the stack encapsulation class of mutex, lock() when constructed, unlock() when destructed, c++ RAII idea; condition_variable cv; The conditional variable should be used with unique_lock; The advantage of unique_lock over lock_guard is that you can always use unlock() and lock(). Before holding mutex (), wait itself will hold unlock() mutex and will hold mutex again if conditions are met. At the end of the thread pool destruct,join() can wait until the task is finished, which is safe!

Git

Code stored in git, here you can get the latest code: https: / / github com/lzpong/threadpool

[copy right from url: http://blog.csdn.net/zdarks/article/details/46994607, https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h]


Related articles: