C++ thread pool implementation method

  • 2020-04-02 03:08:51
  • OfStack

This article illustrates the c++ thread pool implementation. Share with you for your reference. Specific analysis is as follows:

The following thread pool is the one I used in my work. The principle is to set up a task queue, let multiple threads mutually exclusive in the queue to take out the task, and then execute, obviously, the queue is to lock

Environment: ubuntu Linux

File name: locker. H


#ifndef LOCKER_H_ 
#define LOCKER_H_ 
#include "pthread.h" 
class locker 
{ 
public: 
  locker(); 
  virtual ~locker(); 
  bool lock(); 
  void unlock(); 
private: 
  pthread_mutex_t   m_mutex; 
}; 
#endif  

File name: locker. CPP


#include "locker.h" 
locker::locker() 
{ 
  pthread_mutex_init(&m_mutex, 0); 
} 
locker::~locker() 
{ 
  pthread_mutex_destroy(&m_mutex); 
} 
bool locker::lock() 
{ 
  if(0 == pthread_mutex_lock(&m_mutex)) 
    return true; 
  return false; 
} 
void locker::unlock() 
{ 
  pthread_mutex_unlock(&m_mutex); 
}

File name: task_list.h


#ifndef TASK_LIST_H_ 
#define TASK_LIST_H_ 
#include "list" 
#include "locker.h" 
#include "netinet/in.h" 
#include "semaphore.h" 
using namespace std; 
typedef void* (*THREAD_FUNC)(void*); 
//Tasks running in the thread pool. For downstream tasks, the sin contains the destination address information
//Parm0 points to the object sending the data, parm1 points to the data, and parm2 is the length of the data
typedef struct 
{ 
  THREAD_FUNC func; 
  void* parm0; 
  void* parm1; 
  void* parm2; 
} task_info; 
typedef list<task_info*> TASK_LIST; 
typedef list<task_info*>::iterator PTASK_LIST; 
class task_list 
{ 
public: 
  task_list(); 
  virtual ~task_list(); 
  void append_task(task_info* tsk); 
  task_info* fetch_task(); 
private: 
  TASK_LIST m_tasklist; 
  locker m_lk; 
  sem_t m_sem; 
}; 
#endif 

File name: task_list.cpp


#include "task_list.h" 
task_list::task_list() 
{ 
  // Init Semaphore 
  sem_init(&m_sem, 0, 0); 
  m_tasklist.clear(); 
} 
task_list::~task_list() 
{ 
  while(!m_tasklist.empty()) 
  { 
    task_info* tr = m_tasklist.front(); 
    m_tasklist.pop_front(); 
    if(tr) 
      delete tr; 
  } 
  // Destroy Semaphore 
  sem_destroy(&m_sem); 
} 
void task_list::append_task(task_info* tsk) 
{ 
  // Lock before Modify the list 
  m_lk.lock(); 
  m_tasklist.push_back(tsk); 
  m_lk.unlock(); 
  // Increase the Semaphore 
  sem_post(&m_sem); 
} 
task_info* task_list::fetch_task() 
{ 
  task_info* tr = NULL; 
  sem_wait(&m_sem); 
  m_lk.lock(); 
  tr = m_tasklist.front(); 
  m_tasklist.pop_front(); 
  m_lk.unlock(); 
  return tr; 
}

File name: thread_pool.h


#ifndef THREAD_POOL_H_ 
#define THREAD_POOL_H_ 
#include "task_list.h" 
#include "pthread.h" 
#define DEFAULT_THREAD_COUNT  4 
#define MAXIMUM_THREAD_COUNT  1000 
class thread_pool 
{ 
public: 
  thread_pool(); 
  virtual ~thread_pool(); 
  int create_threads(int n = DEFAULT_THREAD_COUNT); 
  void delete_threads(); 
  void set_tasklist(task_list* plist); 
  void del_tasklist(); 
protected: 
  static void* thread_func(void* parm); 
  task_info* get_task(); 
private: 
  int       m_thread_cnt; 
  pthread_t    m_pids[MAXIMUM_THREAD_COUNT]; 
  task_list*   m_tasklist; 
}; 
#endif  

The file name: thread_pool. CPP


#include "thread_pool.h" 
thread_pool::thread_pool() 
{ 
  m_thread_cnt = 0; 
  m_tasklist = NULL; 
} 
thread_pool::~thread_pool() 
{ 
  delete_threads(); 
} 
task_info* thread_pool::get_task() 
{ 
  task_info* tr; 
  if (m_tasklist) 
  { 
    tr = m_tasklist->fetch_task(); 
    return tr; 
  } 
  return NULL; 
} 
void* thread_pool::thread_func(void* parm) 
{ 
  thread_pool *ptp = static_cast<thread_pool*> (parm); 
  task_info *task; 
  while (true) 
  { 
    task = ptp->get_task(); 
    if (task) 
    { 
      (*task->func)(task); 
      //delete task; //func Responsible for the release of task_info 
    } 
  } 
  return NULL; 
} 
int thread_pool::create_threads(int n) 
{ 
  if (n > MAXIMUM_THREAD_COUNT) 
    n = MAXIMUM_THREAD_COUNT; 
  delete_threads(); 
  for (int i = 0; i < n; i++) 
  { 
    int ret = pthread_create(&m_pids[i], NULL, thread_func, (void*) this); 
    if (ret != 0) 
      break; 
    m_thread_cnt++; 
  } 
  return m_thread_cnt; 
} 
void thread_pool::delete_threads() 
{ 
  for (int i = 0; i < m_thread_cnt; i++) 
  { 
    void* retval; 
    pthread_cancel(m_pids[i]); 
    pthread_join(m_pids[i], &retval); 
  } 
  m_thread_cnt = 0; 
} 
void thread_pool::set_tasklist(task_list* plist) 
{ 
  m_tasklist = plist; 
} 
void thread_pool::del_tasklist() 
{ 
  m_tasklist = NULL; 
}

File name: test.cpp


#include "unistd.h" 
#include "stdio.h" 
#include "stdlib.h" 
#include "task_list.h" 
#include "thread_pool.h" 
void* fun(void *parm) 
{ 
  task_info* ptk = (task_info*)parm; 
  pid_t tid = pthread_self(); 
  int count = (int)ptk->parm0; 
  printf("count=%d, tid=%dn", count, tid); 
  return NULL; 
} 
int main() 
{ 
  int count = 0; 
  thread_pool tp; 
  task_list tl; 
  tp.create_threads(4 - 1); 
  tp.set_tasklist(&tl); 
  while (1) 
  { 
    task_info* pti = NULL; 
    pti = (task_info *) malloc(sizeof(task_info)); 
    pti->func = fun; 
    pti->parm0 = (void *)count; 
    tl.append_task(pti); 
    count++; 
    sleep(2); 
  } 
// printf("hello,worldn"); 
  return 0; 
} 

Compile and run, I built the automake project with ecplise, so just modify the makefile.am to compile successfully
File name: Makefile. Am


bin_PROGRAMS=test 
test_SOURCES=test.cpp locker.h locker.cpp  
              task_list.h task_list.cpp  
              thread_pool.h thread_pool.cpp 
test_LDADD=-lpthread 

Execution results:


count=0, tid=-1219888272 
count=1, tid=-1219888272 
count=2, tid=-1228280976 
count=3, tid=-1236673680 
count=4, tid=-1219888272 
count=5, tid=-1228280976 
count=6, tid=-1236673680 
count=7, tid=-1219888272 
count=8, tid=-1228280976 
count=9, tid=-1236673680 

Hope that the article described in the C++ programming to help you.


Related articles: