C++

C++ thread pool implementation method


This article illustrates the c++ thread pool implementation. Share with you for your reference. Specific analysis is as follows:

The following thread pool is the one I used in my work. The principle is to set up a task queue, let multiple threads mutually exclusive in the queue to take out the task, and then execute, obviously, the queue is to lock

Environment: ubuntu Linux

File name: locker. H

#ifndef LOCKER_H_
#define LOCKER_H_
#include "pthread.h"
class locker
{
public:
  locker();
  virtual ~locker();
  bool lock();
  void unlock();
private:
  pthread_mutex_t   m_mutex;
};
#endif

File name: locker. CPP

#include "locker.h"
locker::locker()
{
  pthread_mutex_init(&m_mutex, 0);
}
locker::~locker()
{
  pthread_mutex_destroy(&m_mutex);
}
bool locker::lock()
{
  if(0 == pthread_mutex_lock(&m_mutex))
    return true;
  return false;
}
void locker::unlock()
{
  pthread_mutex_unlock(&m_mutex);
}

File name: task_list.h

#ifndef TASK_LIST_H_
#define TASK_LIST_H_
#include "list"
#include "locker.h"
#include "netinet/in.h"
#include "semaphore.h"
using namespace std;
typedef void* (*THREAD_FUNC)(void*);
//Tasks running in the thread pool. For downstream tasks, the sin contains the destination address information
//Parm0 points to the object sending the data, parm1 points to the data, and parm2 is the length of the data
typedef struct
{
  THREAD_FUNC func;
  void* parm0;
  void* parm1;
  void* parm2;
} task_info;
typedef list<task_info*> TASK_LIST;
typedef list<task_info*>::iterator PTASK_LIST;
class task_list
{
public:
  task_list();
  virtual ~task_list();
  void append_task(task_info* tsk);
  task_info* fetch_task();
private:
  TASK_LIST m_tasklist;
  locker m_lk;
  sem_t m_sem;
};
#endif

File name: task_list.cpp

#include "task_list.h"
task_list::task_list()
{
  // Init Semaphore
  sem_init(&m_sem, 0, 0);
  m_tasklist.clear();
}
task_list::~task_list()
{
  while(!m_tasklist.empty())
  {
    task_info* tr = m_tasklist.front();
    m_tasklist.pop_front();
    if(tr)
      delete tr;
  }
  // Destroy Semaphore
  sem_destroy(&m_sem);
}
void task_list::append_task(task_info* tsk)
{
  // Lock before Modify the list
  m_lk.lock();
  m_tasklist.push_back(tsk);
  m_lk.unlock();
  // Increase the Semaphore
  sem_post(&m_sem);
}
task_info* task_list::fetch_task()
{
  task_info* tr = NULL;
  sem_wait(&m_sem);
  m_lk.lock();
  tr = m_tasklist.front();
  m_tasklist.pop_front();
  m_lk.unlock();
  return tr;
}

File name: thread_pool.h

#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_
#include "task_list.h"
#include "pthread.h"
#define DEFAULT_THREAD_COUNT  4
#define MAXIMUM_THREAD_COUNT  1000
class thread_pool
{
public:
  thread_pool();
  virtual ~thread_pool();
  int create_threads(int n = DEFAULT_THREAD_COUNT);
  void delete_threads();
  void set_tasklist(task_list* plist);
  void del_tasklist();
protected:
  static void* thread_func(void* parm);
  task_info* get_task();
private:
  int       m_thread_cnt;
  pthread_t    m_pids[MAXIMUM_THREAD_COUNT];
  task_list*   m_tasklist;
};
#endif

The file name: thread_pool. CPP

#include "thread_pool.h"
thread_pool::thread_pool()
{
  m_thread_cnt = 0;
  m_tasklist = NULL;
}
thread_pool::~thread_pool()
{
  delete_threads();
}
task_info* thread_pool::get_task()
{
  task_info* tr;
  if (m_tasklist)
  {
    tr = m_tasklist->fetch_task();
    return tr;
  }
  return NULL;
}
void* thread_pool::thread_func(void* parm)
{
  thread_pool *ptp = static_cast<thread_pool*> (parm);
  task_info *task;
  while (true)
  {
    task = ptp->get_task();
    if (task)
    {
      (*task->func)(task);
      //delete task; //func Responsible for the release of task_info
    }
  }
  return NULL;
}
int thread_pool::create_threads(int n)
{
  if (n > MAXIMUM_THREAD_COUNT)
    n = MAXIMUM_THREAD_COUNT;
  delete_threads();
  for (int i = 0; i < n; i++)
  {
    int ret = pthread_create(&m_pids[i], NULL, thread_func, (void*) this);
    if (ret != 0)
      break;
    m_thread_cnt++;
  }
  return m_thread_cnt;
}
void thread_pool::delete_threads()
{
  for (int i = 0; i < m_thread_cnt; i++)
  {
    void* retval;
    pthread_cancel(m_pids[i]);
    pthread_join(m_pids[i], &retval);
  }
  m_thread_cnt = 0;
}
void thread_pool::set_tasklist(task_list* plist)
{
  m_tasklist = plist;
}
void thread_pool::del_tasklist()
{
  m_tasklist = NULL;
}

File name: test.cpp

#include "unistd.h"
#include "stdio.h"
#include "stdlib.h"
#include "task_list.h"
#include "thread_pool.h"
void* fun(void *parm)
{
  task_info* ptk = (task_info*)parm;
  pid_t tid = pthread_self();
  int count = (int)ptk->parm0;
  printf("count=%d, tid=%dn", count, tid);
  return NULL;
}
int main()
{
  int count = 0;
  thread_pool tp;
  task_list tl;
  tp.create_threads(4 - 1);
  tp.set_tasklist(&tl);
  while (1)
  {
    task_info* pti = NULL;
    pti = (task_info *) malloc(sizeof(task_info));
    pti->func = fun;
    pti->parm0 = (void *)count;
    tl.append_task(pti);
    count++;
    sleep(2);
  }
// printf("hello,worldn");
  return 0;
}

Compile and run, I built the automake project with ecplise, so just modify the makefile.am to compile successfully File name: Makefile. Am

bin_PROGRAMS=test
test_SOURCES=test.cpp locker.h locker.cpp
              task_list.h task_list.cpp
              thread_pool.h thread_pool.cpp
test_LDADD=-lpthread

Execution results:

count=0, tid=-1219888272
count=1, tid=-1219888272
count=2, tid=-1228280976
count=3, tid=-1236673680
count=4, tid=-1219888272
count=5, tid=-1228280976
count=6, tid=-1236673680
count=7, tid=-1219888272
count=8, tid=-1228280976
count=9, tid=-1236673680

Hope that the article described in the C++ programming to help you.