C++ version of the thread and task pool example

  • 2020-04-02 02:10:57
  • OfStack

commondef.h


//In seconds, monitor the free list interval, and tasks that exceed TASK_DESTROY_INTERVAL in the free queue will be automatically destroyed
const int CHECK_IDLE_TASK_INTERVAL = 300;
//Unit of seconds, task auto-destruction time interval
const int TASK_DESTROY_INTERVAL = 60; 
//Monitors whether the thread pool is empty for an interval of microseconds
const int IDLE_CHECK_POLL_EMPTY = 500;
//Thread pool thread idle automatic exit interval,5 minutes
const int  THREAD_WAIT_TIME_OUT = 300;

taskpool.cpp


#include "taskpool.h"
#include <string.h>
#include <stdio.h>
#include <pthread.h>
    TaskPool::TaskPool(const int & poolMaxSize)
    : m_poolSize(poolMaxSize)
      , m_taskListSize(0) 
      , m_bStop(false)
{
    pthread_mutex_init(&m_lock, NULL);
    pthread_mutex_init(&m_idleMutex, NULL);
    pthread_cond_init(&m_idleCond, NULL);
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); //Let the thread run independently
    pthread_create(&m_idleId, &attr, CheckIdleTask, this); //Create the monitor idle task process
    pthread_attr_destroy(&attr);
}
TaskPool::~TaskPool()
{
    if(!m_bStop)
    {
        StopPool();
    }
    if(!m_taskList.empty())
    {
        std::list<Task*>::iterator it = m_taskList.begin();
        for(; it != m_taskList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_taskList.clear();
        m_taskListSize = 0;
    }
    if(!m_idleList.empty())
    {
        std::list<Task*>::iterator it = m_idleList.begin();
        for(; it != m_idleList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_idleList.clear();
    }

    pthread_mutex_destroy(&m_lock);
    pthread_mutex_destroy(&m_idleMutex);
    pthread_cond_destroy(&m_idleCond);
}
void * TaskPool::CheckIdleTask(void * arg)
{
    TaskPool * pool = (TaskPool*)arg;
    while(1)
    {
        pool->LockIdle();
        pool->RemoveIdleTask();
        if(pool->GetStop())
        {
            pool->UnlockIdle();
            break;
        }
        pool->CheckIdleWait();
        pool->UnlockIdle();
    }
}
void TaskPool::StopPool()
{
    m_bStop = true;
    LockIdle();
    pthread_cond_signal(&m_idleCond); //Prevents the monitoring thread from waiting and causing an unegress problem
    UnlockIdle();
    pthread_join(m_idleId, NULL);
}
bool TaskPool::GetStop()
{
    return m_bStop;
}
void TaskPool::CheckIdleWait()
{
    struct timespec timeout;
    memset(&timeout, 0, sizeof(timeout));
    timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
    timeout.tv_nsec = 0;
    pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}
int TaskPool::RemoveIdleTask()
{
    int iRet = 0;
    std::list<Task*>::iterator it, next;
    std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
    time_t curTime = time(0);
    for(; rit != m_idleList.rend(); )
    {
        it = --rit.base();
        if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
        {
            iRet++;
            delete *it;
            *it = NULL;
            next = m_idleList.erase(it);
            rit = std::list<Task*>::reverse_iterator(next);
        }
        else
        {
            break;    
        }
    }
}
int TaskPool::AddTask(task_fun fun, void *arg)
{
    int iRet = 0;
    if(0 != fun)
    {
        pthread_mutex_lock(&m_lock);
        if(m_taskListSize >= m_poolSize)
        {
            pthread_mutex_unlock(&m_lock);
            iRet = -1; //task pool is full;
        }
        else
        {
            pthread_mutex_unlock(&m_lock);
            Task * task = GetIdleTask();
            if(NULL == task)
            {
                task = new Task;
            }
            if(NULL == task)
            {
                iRet = -2; // new failed 
            }
            else
            {
                task->fun = fun;
                task->data = arg;
                pthread_mutex_lock(&m_lock);
                m_taskList.push_back(task);
                ++m_taskListSize;
                pthread_mutex_unlock(&m_lock);
            }
        }
    }
    return iRet;
}
Task* TaskPool::GetTask()
{
    Task *task = NULL;
    pthread_mutex_lock(&m_lock);
    if(!m_taskList.empty())
    {
        task =  m_taskList.front();
        m_taskList.pop_front();
        --m_taskListSize;
    }
    pthread_mutex_unlock(&m_lock);
    return task;
}
void TaskPool::LockIdle()
{
    pthread_mutex_lock(&m_idleMutex);
}
void TaskPool::UnlockIdle()
{
    pthread_mutex_unlock(&m_idleMutex);
}
Task * TaskPool::GetIdleTask()
{
    LockIdle();
    Task * task = NULL;
    if(!m_idleList.empty())
    {
        task = m_idleList.front();
        m_idleList.pop_front();
    }
    UnlockIdle();
    return task;
}
void TaskPool::SaveIdleTask(Task*task)
{
    if(NULL != task)
    {
        task->fun = 0;
        task->data = NULL;
        task->last_time = time(0);
        LockIdle();
        m_idleList.push_front(task);
        UnlockIdle();
    }
}

taskpool.h


#ifndef TASKPOOL_H
#define TASKPOOL_H

#include <list>
#include <pthread.h>
#include "commondef.h"
//All user actions are a task,
typedef void (*task_fun)(void *);
struct Task
{
    task_fun fun; //Task handling function
    void* data; //Task processing data
    time_t last_time; //The time to join the idle queue is used for automatic destruction
};
//Task pool, all tasks are posted to the task pool, and the administrative thread is responsible for Posting tasks to the thread pool
class TaskPool
{
public:
  
    TaskPool(const int & poolMaxSize);
    ~TaskPool();
        
    int AddTask(task_fun fun, void* arg);

        
    Task* GetTask();
    
    void SaveIdleTask(Task*task);

    void StopPool();
public:
    void LockIdle();
    void UnlockIdle();
    void CheckIdleWait();
    int RemoveIdleTask();
    bool GetStop();
private:
    static void * CheckIdleTask(void *);
    
    Task* GetIdleTask();
    int GetTaskSize();
private:
    int m_poolSize; //Task pool size
    int m_taskListSize; //Count the size of the taskList, because the time will increase as the number of lists increases
    bool m_bStop; //Whether or not to stop
    std::list<Task*> m_taskList;//List of tasks to be processed
    std::list<Task*> m_idleList;//All free task lists
    pthread_mutex_t m_lock; //Locks the task list to ensure that only one task is taken at a time
    pthread_mutex_t m_idleMutex; //Idle task queue lock
    pthread_cond_t m_idleCond; //Idle queue wait condition
    pthread_t m_idleId;;
};
#endif

threadpool.cpp



#include "threadpool.h"
#include <errno.h>
#include <string.h>

Thread::Thread(bool detach, ThreadPool * pool)
    : m_pool(pool)
{
    pthread_attr_init(&m_attr);
    if(detach)
    {
        pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); //Let the thread run independently
    }
    else
    {
         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
    }
    pthread_mutex_init(&m_mutex, NULL); //Initialize the mutex
    pthread_cond_init(&m_cond, NULL); //Initialize the condition variable
    task.fun = 0;
    task.data = NULL;
}
Thread::~Thread()
{
    pthread_cond_destroy(&m_cond);
    pthread_mutex_destroy(&m_mutex);
    pthread_attr_destroy(&m_attr);
}
    ThreadPool::ThreadPool()
    : m_poolMax(0)
    , m_idleNum(0)
    , m_totalNum(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_mutex_init(&m_runMutex,NULL);
    pthread_mutex_init(&m_terminalMutex, NULL);
    pthread_cond_init(&m_terminalCond, NULL);
    pthread_cond_init(&m_emptyCond, NULL);
}
ThreadPool::~ThreadPool()
{
    
    pthread_mutex_destroy(&m_runMutex);
    pthread_mutex_destroy(&m_terminalMutex);
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_terminalCond);
    pthread_cond_destroy(&m_emptyCond);
}
int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
    if(poolMax < poolPre 
            || poolPre < 0 
            || poolMax <= 0)
    {
        return -1;
    }
    m_poolMax = poolMax;
    int iRet = 0;
    for(int i=0; i<poolPre; ++i)
    {
        Thread * thread = CreateThread();
        if(NULL == thread)
        {
            iRet = -2;
        }
    }

    if(iRet < 0)
    { 
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it!= m_threads.end(); ++it)
        {
            if(NULL != (*it) )
            {
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
        m_totalNum = 0;
    }
    return iRet;
}
void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
    //Gets a thread from the thread pool
    pthread_mutex_lock( &m_mutex);
    if(m_threads.empty())
    {
        pthread_cond_wait(&m_emptyCond,&m_mutex); //Blocked waiting for a free thread
    }
    Thread * thread = m_threads.front();
    m_threads.pop_front();
    pthread_mutex_unlock( &m_mutex);
    pthread_mutex_lock( &thread->m_mutex );
    thread->task.fun = fun;
    thread->task.data = arg;        
    pthread_cond_signal(&thread->m_cond); //Triggers thread WapperFun loop execution
    pthread_mutex_unlock( &thread->m_mutex );
}
int ThreadPool::Run(task_fun fun, void * arg) 
{
    pthread_mutex_lock(&m_runMutex); //Ensure that only one thread can execute at a time
    int iRet = 0;
    if(m_totalNum <m_poolMax) //
    {
        if(m_threads.empty() && (NULL == CreateThread()) )
        {
            iRet = -1;//can not create new thread!
        }
        else
        {
            GetThreadRun(fun, arg);
        }
    }
    else
    {
        GetThreadRun(fun, arg);
    }
    pthread_mutex_unlock(&m_runMutex);
    return iRet;
}
void ThreadPool::StopPool(bool bStop)
{
    m_bStop = bStop;
    if(bStop)
    {
        //Starts a thread that monitors whether all idle threads exit
        Thread thread(false, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //Start monitoring all threads to exit the thread
        //Block waits for all idle threads to exit
        pthread_join(thread.m_threadId, NULL);
    }
    /*if(bStop)
    {
        pthread_mutex_lock(&m_terminalMutex);
        //Starts a thread that monitors whether all idle threads exit
        Thread thread(true, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //Start monitoring all threads to exit the thread
        //Block waits for all idle threads to exit
        pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
        pthread_mutex_unlock(&m_terminalMutex);
    }*/
}
bool ThreadPool::GetStop()
{
    return m_bStop;
}
Thread * ThreadPool::CreateThread()
{
    Thread * thread = NULL;
    thread = new Thread(true, this);
    if(NULL != thread)
    {
        int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //Add threads to the idle queue through WapperFun
        if(0 != iret)
        {
            delete thread;
            thread = NULL;
        }
    }
    return thread;
}
void * ThreadPool::WapperFun(void*arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    pool->IncreaseTotalNum();
    struct timespec abstime;
    memset(&abstime, 0, sizeof(abstime));
    while(1)
    {
        if(0 != thread->task.fun)
        {
            thread->task.fun(thread->task.data);
        }
        if( true == pool->GetStop() )  
        {
            break; //Determine whether to exit the thread after the current task is completed
        }
        pthread_mutex_lock( &thread->m_mutex );
        pool->SaveIdleThread(thread); //Adds a thread to the idle queue
        abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
        abstime.tv_nsec = 0;
        if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //Waits for the thread to wake up or the timeout to exit automatically
        {
            pthread_mutex_unlock( &thread->m_mutex );
            break;
        }
        pthread_mutex_unlock( &thread->m_mutex );
    }
    pool->LockMutex();
    pool->DecreaseTotalNum();
    if(thread != NULL)
    {
        pool->RemoveThread(thread);
        delete thread;
        thread = NULL;
    }
    pool->UnlockMutex();
    return 0;
}
void ThreadPool::SaveIdleThread(Thread * thread )
{
    if(thread)
    {
        thread->task.fun = 0;
        thread->task.data = NULL;
        LockMutex();
        if(m_threads.empty())
        {
            pthread_cond_broadcast(&m_emptyCond); //Sends a non-empty signal to tell the run function that the thread queue is not empty
        }
        m_threads.push_front(thread);
        UnlockMutex();
    }
}
int ThreadPool::TotalThreads()
{
    return m_totalNum;
}

void ThreadPool::SendSignal()
{
    LockMutex();
    std::list<Thread*>::iterator it = m_threads.begin();
    for(; it!= m_threads.end(); ++it)
    {
        pthread_mutex_lock( &(*it)->m_mutex );
        pthread_cond_signal(&((*it)->m_cond));
        pthread_mutex_unlock( &(*it)->m_mutex );
    }
    UnlockMutex();
}
void * ThreadPool::TerminalCheck(void* arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    while((false == pool->GetStop()) || pool->TotalThreads() >0 )
    {
        pool->SendSignal();
        usleep(IDLE_CHECK_POLL_EMPTY);
    }
    //pool->TerminalCondSignal();
    return 0;
}
void ThreadPool::TerminalCondSignal()
{
    pthread_cond_signal(&m_terminalCond);
}
void ThreadPool::RemoveThread(Thread* thread)
{
    m_threads.remove(thread);
}
void ThreadPool::LockMutex()
{
    pthread_mutex_lock( &m_mutex);
}
void ThreadPool::UnlockMutex()
{
    pthread_mutex_unlock( &m_mutex );
}
void ThreadPool::IncreaseTotalNum()
{
    LockMutex();
    m_totalNum++;
    UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
    m_totalNum--;
}

threadpool.h


#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <list>
#include <string>
#include "taskpool.h"
//The task scheduling process is controlled through threadmanager
//The TerminalCheck thread of a threadpool is responsible for monitoring the exit of all threads from the threadpool

class ThreadPool;
class Thread
{
public:
    Thread(bool detach, ThreadPool * pool);
    ~Thread();
    pthread_t  m_threadId; //Thread id
    pthread_mutex_t m_mutex; //The mutex
    pthread_cond_t m_cond; //Condition variables,
    pthread_attr_t m_attr; //Thread attributes
 Task  task; // 
    ThreadPool * m_pool; //Owned thread pool
};
//Thread pool, which is responsible for creating thread processing tasks, will add threads to the idle queue after processing, from the task pool
class ThreadPool
{
public:
    ThreadPool();
    ~ThreadPool();
    
    int InitPool(const int & poolMax, const int & poolPre);
    
    int Run(task_fun fun, void* arg);

 
 void StopPool(bool bStop);

public: //This public function is primarily used for static function calls
    
    bool GetStop();    
 void SaveIdleThread(Thread * thread );
    void LockMutex();
    void UnlockMutex();
    void DecreaseTotalNum();
    void IncreaseTotalNum();
    void RemoveThread(Thread* thread);
    void TerminalCondSignal();
    int TotalThreads();
    void SendSignal();
private:
 
 Thread * CreateThread();
    
    void GetThreadRun(task_fun fun, void* arg);
 static void * WapperFun(void*);
 static void * TerminalCheck(void*);//The loop monitors whether all threads terminate threads
private:
    int m_poolMax;//Maximum number of threads in the thread pool
    int m_idleNum; //Number of idle threads
    int m_totalNum; //The current total number of threads is less than the maximum number of threads & NBSP;
 bool m_bStop; //Whether to stop the thread pool
 pthread_mutex_t m_mutex; //Thread list lock
 pthread_mutex_t m_runMutex; //The run function locks
    pthread_mutex_t m_terminalMutex; //Terminates all thread mutexes
    pthread_cond_t  m_terminalCond; // Terminate all threads Condition variables,
    pthread_cond_t  m_emptyCond; // The idle thread is not empty Condition variables,

    std::list<Thread*> m_threads; //The thread list
};
#endif

threadpoolmanager.cpp


#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"
#include <errno.h>
#include <string.h>
/*#include <string.h>
#include <sys/time.h>
#include <stdio.h>*/
 //   struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager() 
    : m_threadPool(NULL)
    , m_taskPool(NULL)
    , m_bStop(false)
{
    pthread_mutex_init(&m_mutex_task,NULL);
    pthread_cond_init(&m_cond_task, NULL);
   
}
ThreadPoolManager::~ThreadPoolManager()
{
    StopAll();
    if(NULL != m_threadPool)
    {
        delete m_threadPool;
        m_threadPool = NULL;
    }
    if(NULL != m_taskPool)
    {
        delete m_taskPool;
        m_taskPool = NULL;
    }
    pthread_cond_destroy( &m_cond_task);
    pthread_mutex_destroy( &m_mutex_task );
    
}
int ThreadPoolManager::Init(
        const int &tastPoolSize,
        const int &threadPoolMax,
        const int &threadPoolPre)
{
    m_threadPool = new ThreadPool();
    if(NULL == m_threadPool)
    {
        return -1;
    }
    m_taskPool = new TaskPool(tastPoolSize);
    if(NULL == m_taskPool)
    {
        return -2;
    }
    if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
    {
        return -3;
    }
    //Start thread pool
    //Start task pool
    //Start the task acquisition thread and keep taking tasks from the task pool to the thread pool
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
    pthread_create(&m_taskThreadId, &attr, TaskThread, this); //Create the fetch task process
    pthread_attr_destroy(&attr);
    return 0;
}
void ThreadPoolManager::StopAll()
{
    m_bStop = true;
    LockTask();
    pthread_cond_signal(&m_cond_task);
    UnlockTask();
    pthread_join(m_taskThreadId, NULL);
    //Wait for all current tasks to complete
    m_taskPool->StopPool();
    m_threadPool->StopPool(true); //Stop thread pool work
}
void ThreadPoolManager::LockTask()
{
    pthread_mutex_lock(&m_mutex_task);
}
void ThreadPoolManager::UnlockTask()
{
    pthread_mutex_unlock(&m_mutex_task);
}
void* ThreadPoolManager::TaskThread(void* arg)
{
    ThreadPoolManager * manager = (ThreadPoolManager*)arg;
    while(1)
    {
        manager->LockTask(); //A stop signal was sent to prevent the task from not being completed
        while(1) //Complete the task in the task queue before exiting
        {
            Task * task = manager->GetTaskPool()->GetTask();
            if(NULL == task)
            {
                break;
            }
            else
            {
                manager->GetThreadPool()->Run(task->fun, task->data);
                manager->GetTaskPool()->SaveIdleTask(task);
            }
        }
        if(manager->GetStop())
        {
            manager->UnlockTask();
            break;
        }
        manager->TaskCondWait(); //Wait for a task to be executed
        manager->UnlockTask();
    }
    return 0;
}
ThreadPool * ThreadPoolManager::GetThreadPool()
{
    return m_threadPool;
}
TaskPool * ThreadPoolManager::GetTaskPool()
{
    return m_taskPool;
}
int  ThreadPoolManager::Run(task_fun fun,void* arg)
{
    if(0 == fun)
    {
        return 0;
    }
    if(!m_bStop)
    {   
        int iRet =  m_taskPool->AddTask(fun, arg);
        if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) ) 
        {
            pthread_cond_signal(&m_cond_task);
            UnlockTask();
        }
        return iRet;
    }
    else
    {
        return -3;
    }
}
bool ThreadPoolManager::GetStop()
{
    return m_bStop;
}
void ThreadPoolManager::TaskCondWait()
{
    struct timespec to;
    memset(&to, 0, sizeof to);
    to.tv_sec = time(0) + 60;
    to.tv_nsec = 0;
    pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60-second timeouts
}

threadpoolmanager.h


#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
 *       Basic process: 
 *           management The thread pool and Task pools , add the task first Task pools And then by TaskThread Responsible for the Task pools Take the task out and put it into The thread pool In the 
 *       Basic functions: 
 *          1 The worker thread can automatically exit the long time unused thread when the business is not busy 
 *          2 , Task pools Resources that have not been used for a long time can be automatically released when business is not busy commondef.h Modify) 
 *          3 , when the program is no longer backward Task pools To add a task when Task pools Only exit the relevant program after all tasks in 
 *       Thread resources: 
 *           If you don't pre-assign any processing threads, ThreadPool The required threads are actually created only when a task is available, and the maximum number of threads created is specified by the user 
 *           when manager At the time of destruction, manager A monitoring thread is created to monitor the completion of all tasks, only after the completion of all tasks manager To destroy 
 *           The maximum number of threads is: 1 a TaskPool thread  + 1 a manager Task scheduling thread  + ThreadPool Maximum number of threads  + 1 a manager Exit the monitor thread  + 1The thread pool All threads exit the monitor thread 
 *           The minimum number of threads is: 1 a TaskPool Create an idle task resource to destroy the monitoring thread  + 1 a manager Create a task scheduling thread 
 *       Usage: 
 *          ThreadPoolManager manager;
 *          manager.Init(100000, 50, 5);//Initializes a task pool of 10000, the maximum number of threads in the thread pool is 50, and precreates a manager of 5 threads
 *          manager.run(fun, data); //Add the execution task to the manager. Fun is the function pointer, data is the parameter that fun needs to pass in, and data can be NULL
 *
 * date    @ 2013.12.23
 * author  @ haibin.wang
 *
 *   Detailed parameter controls can be modified commondef.h The values of the related variables in 
 */
#include <pthread.h>
typedef void (*task_fun)(void *);
class ThreadPool;
class TaskPool;
class ThreadPoolManager
{
public:
    ThreadPoolManager();
    ~ThreadPoolManager();
    
    int Init(const int &tastPoolSize,
            const int &threadPoolMax,
            const int &threadPoolPre);
    
    int Run(task_fun fun,void* arg=NULL);

public: //The following public functions are mainly used for static function calls
    bool GetStop();
    void TaskCondWait();
    TaskPool * GetTaskPool();
    ThreadPool * GetThreadPool();
    void LockTask();
    void UnlockTask();
    void LockFull();
private:
 static void * TaskThread(void*); //Task processing thread
 void StopAll();

private:
    ThreadPool *m_threadPool; //The thread pool
    TaskPool * m_taskPool; //Task pools
    bool m_bStop; //Whether to terminate the manager

    pthread_t m_taskThreadId; //TaskThread thread id
 pthread_mutex_t m_mutex_task;
    pthread_cond_t m_cond_task;
};
#endif

main.cpp


#include <iostream>
#include <string>
#include "threadpoolmanager.h"
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
    pthread_mutex_lock(&m_mutex);
    seq++;
    if(seq%inter == 0 )
    {
        cout << "fun 1=" << seq << endl;
    }
    if(seq>=1000000000)
    {
        cout << "billion" << endl;
        seq = 0;
        billNum++;
    }
    pthread_mutex_unlock(&m_mutex);
    //sleep();
}
int main(int argc, char** argv)
{
    if(argc != 6)
    {
        cout << " There must be 5 A parameter   Task execution times   Task pool size   Thread pool size   Number of pre-created threads   The output gap " << endl;
        cout << "eg: ./test 999999 10000 100 10 20" << endl;
        cout << " The above example represents creating an interval 20 The task pool size is 10000 , the thread pool size is 100 , create 10 The number of execution times is: 999999" << endl;
        return 0;
    }
    double loopSize = atof(argv[1]);
    int taskSize = atoi(argv[2]);
    int threadPoolSize = atoi(argv[3]);
    int preSize = atoi(argv[4]);
    inter = atoi(argv[5]);
    pthread_mutex_init(&m_mutex,NULL);
    ThreadPoolManager manager;
    if(0>manager.Init(taskSize,  threadPoolSize, preSize))
    {
        cout << " Initialization failure " << endl;
        return 0;
    }
    cout << "******************* Initialization complete *********************" << endl;
    struct timeval time_beg, time_end;
    memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval)); 
    gettimeofday(&time_beg, NULL);
    double i=0;
    for(; i<loopSize; ++i)
    {
        while(0>manager.Run(myFunc,NULL))
        {
            usleep(100);
        }
    }
    gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    cout << "total time =" << total << endl;
    cout << "total num =" << i  << " billion num=" << billNum<< endl;
    cout << __FILE__ << " All threads will be closed " << endl;
    //pthread_mutex_destroy(&m_mutex);
    return 0;
}


Related articles: