C++ thread pool simple implementation method

  • 2020-04-02 02:43:09
  • OfStack

In this paper, the simple implementation of C++ thread pool is described in detail in the form of an example. Share with you for your reference. Specific methods are as follows:

I. several basic thread functions:

1. Thread manipulation function:


int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //create
void pthread_exit(void *retval);            //Termination of their
int pthread_cancel(pthread_t tid);            //Terminate others. After sending the termination signal, the target thread does not necessarily terminate, to call the join function to wait
int pthread_join(pthread_t tid, void **retval);   //Blocks and waits for another thread

2. Properties:


int pthread_attr_init(pthread_attr_t *attr);           //Initialization property
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate); //Set the separate state
int pthread_attr_destroy(pthread_attr_t *attr);           //Destruction of property

 

3. Synchronization function
The mutex


int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); //Initialize the lock
int pthread_mutex_destroy(pthread_mutex_t *mutex); //Destruction of the lock
int pthread_mutex_lock(pthread_mutex_t *mutex); //lock
int pthread_mutex_trylock(pthread_mutex_t *mutex); // try lock Above, lock Non-blocking version of 
int pthread_mutex_unlock(pthread_mutex_t *mutex); //unlock

4. Conditional variables


int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr); //Initialize the
int pthread_cond_destroy(pthread_cond_t *cond);                 //The destruction
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);     //Waiting on the condition
int pthread_cond_signal(pthread_cond_t *cond);                 //Notification to wake up the first thread to go to sleep by calling pthread_cond_wait()

5. Utility functions


int pthread_equal(pthread_t t1, pthread_t t2); //Compare thread ID
int pthread_detach(pthread_t tid);       //Separate thread
pthread_t pthread_self(void);            //Their own ID

In the above code, the thread's cancel and join, and the last tool function, the parameters of these functions are structural variables, and the other function parameters are Pointers to structural variables. Just to taste, the arguments are Pointers, because you need to change the contents of the structure, and the arguments are ordinary variables, you just need to read the contents.

Ii. Thread pool code


#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>  //Multithreaded header file in Linux environment, non-c language standard library, compile time last to add -lpthread call dynamic link library

//The structure of the work list
typedef struct worker {
  void *(*process)(void *arg);  //Job function
  void *arg;           //Parameter of function
  struct worker *next;
}CThread_worker;

//The structure of the thread pool
typedef struct {
  pthread_mutex_t queue_lock;   //The mutex
  pthread_cond_t queue_ready;  //Condition variable/semaphore

  CThread_worker *queue_head;   //Points to the head of the work list, the critical section
  int cur_queue_size;       //Records the number of jobs in the linked list, the critical section

  int max_thread_num;       //Maximum number of threads
  pthread_t *threadid;      //Thread ID

  int shutdown;          //switch
}CThread_pool;

static CThread_pool *pool = NULL;  //A thread pool variable
int pool_add_worker(void *(*process)(void *arg), void *arg);  //Responsible for adding work to the work list
void *thread_routine(void *arg);  //Thread routines

//Thread pool initialization
void pool_init(int max_thread_num)
{
  int i = 0;

  pool = (CThread_pool *) malloc (sizeof(CThread_pool));  //Creating a thread pool

  pthread_mutex_init(&(pool->queue_lock), NULL);   //The mutex Initializes with the address of the lock as the parameter 
  pthread_cond_init( &(pool->queue_ready), NULL);   //The condition variable is initialized and the parameter is the variable address

  pool->queue_head = NULL;
  pool->cur_queue_size = 0;

  pool->max_thread_num = max_thread_num;
  pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
  for (i = 0; i < max_thread_num; i++) {
    pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); // Create a thread ,  Parameters for Thread ID Variable address, property, routine, parameter 
  }

  pool->shutdown = 0;
}

// Routines that call specific Job function
void *thread_routine(void *arg)
{
  printf("starting thread 0x%xn", (int)pthread_self());
  while(1) {
    pthread_mutex_lock(&(pool->queue_lock));  // To take a job from a linked list, add it first The mutex , the parameter is the lock address 

    while(pool->cur_queue_size == 0 && !pool->shutdown) {    //The list is empty
      printf("thread 0x%x is waitingn", (int)pthread_self());
      pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));  //Wait for a resource, and the semaphore is used for notification. The lock for the second parameter is released for addition. The function returns with a new lock.
    }

    if(pool->shutdown) {
      pthread_mutex_unlock(&(pool->queue_lock));     // The end of the switch Opens, releases the lock, and exits the thread 
      printf("thread 0x%x will exitn", (int)pthread_self());
      pthread_exit(NULL);   //The argument is void star
    }

    printf("thread 0x%x is starting to workn", (int)pthread_self());

    --pool->cur_queue_size;
    CThread_worker *worker = pool->queue_head;
    pool->queue_head = worker->next;

    pthread_mutex_unlock (&(pool->queue_lock));   //After obtaining a work release lock
    (*(worker->process))(worker->arg);   //Do the work
    free(worker);
    worker = NULL;
  }
  pthread_exit(NULL);
}

//Destroy thread pool
int pool_destroy()
{
  if(pool->shutdown)   // End of the test switch On or off, if on, all threads will exit automatically 
    return -1;
  pool->shutdown = 1;

  pthread_cond_broadcast( &(pool->queue_ready) );   //Broadcast, wake up all threads, ready to exit

  int i;
  for(i = 0; i < pool->max_thread_num; ++i)
    pthread_join(pool->threadid[i], NULL);   //The main thread waits for all threads to exit, only join the first parameter is not a pointer, the second parameter type is void **, receive the return value of exit, need to cast
  free(pool->threadid);
  CThread_worker *head = NULL;
  while(pool->queue_head != NULL) {      //Release the remaining nodes of the unexecuted work list
    head = pool->queue_head;
    pool->queue_head = pool->queue_head->next;
    free(head);
  }

  pthread_mutex_destroy(&(pool->queue_lock));   //Destroy locks and condition variables
  pthread_cond_destroy(&(pool->queue_ready));

  free(pool);
  pool=NULL;
  return 0;
}

void *myprocess(void *arg)
{
  printf("threadid is 0x%x, working on task %dn", (int)pthread_self(), *(int*)arg);
  sleep (1);
  return NULL;
}

//Add the work
int pool_add_worker(void *(*process)(void *arg), void *arg)
{
  CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));
  newworker->process = process;  // specific Job function
  newworker->arg = arg;
  newworker->next = NULL;

  pthread_mutex_lock( &(pool->queue_lock) );   //lock

  CThread_worker *member = pool->queue_head;   //Insert the end of the list
  if( member != NULL ) {
    while( member->next != NULL )
      member = member->next;
    member->next = newworker;
  }
  else {
    pool->queue_head = newworker;
  }
  ++pool->cur_queue_size;

  pthread_mutex_unlock( &(pool->queue_lock) );  //unlock

  pthread_cond_signal( &(pool->queue_ready) );  //Notifies a waiting thread
  return 0;
}

int main(int argc, char **argv)
{
  pool_init(3);  // The main thread Creating a thread pool . 3 A thread 

  int *workingnum = (int *) malloc(sizeof(int) * 10);
  int i;
  for(i = 0; i < 10; ++i) {
    workingnum[i] = i;
    pool_add_worker(myprocess, &workingnum[i]);   // Main thread responsibility Add the work . 10 A job 
  }

  sleep (5);
  pool_destroy();   //Destroy thread pool
  free (workingnum);

  return 0;
}

Hope that the article described in the C++ programming to help you.


Related articles: