The C language implements a thread pool that supports dynamic expansion and destruction

  • 2020-05-07 20:11:14
  • OfStack

This article introduces the implementation of C language thread pool, support dynamic expansion and destruction, to share with you for your reference, the specific content is as follows

Realize the function

1. Initializes the specified number of threads 2. Use linked lists to manage task queues 3. Support extended dynamic threads 4. If there are too many idle threads, dynamically destroy some threads

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>
 

/* A thread's task queue is formed by , Functions and arguments , Tasks are managed by linked lists */
typedef struct thread_worker_s{
  void *(*process)(void *arg); // The processing function 
  void *arg;          // parameter 
  struct thread_worker_s *next;
}thread_worker_t;
 
#define bool int
#define true 1
#define false 0
 
/* State description of each thread in the thread pool */
#define THREAD_STATE_RUN        0
#define THREAD_STATE_TASK_WAITING   1
#define THREAD_STATE_TASK_PROCESSING  2
#define THREAD_STATE_TASK_FINISHED   3
#define THREAD_STATE_EXIT       4   
 
 
typedef struct thread_info_s{
  pthread_t id;
  int    state; 
  struct thread_info_s *next;
}thread_info_t;
 
static char* thread_state_map[] ={" create "," Wait for the task "," In the processing "," Processing is complete "," Have withdrawn from the "};
/* Only when the thread is compressed  0,1,2,4  Threads of state can be destroyed */
 
 
/* Thread pool manager */
#define THREAD_BUSY_PERCENT 0.5  /* thread : task  = 1:2  The smaller the value, the more tasks , Increase the thread */
#define THREAD_IDLE_PERCENT 2   /* thread : task  = 2:1  Value is greater than 1, More threads than tasks , Destroy some threads */
 
typedef struct thread_pool_s{
  pthread_mutex_t queue_lock ; // Queue mutex , Locks are required when queue modifications are involved 
  pthread_cond_t queue_ready; // Queue conditional lock , The queue satisfies a condition, triggering the thread waiting for that condition to continue execution , Let's say the queue is full , The queue is empty 
 
  thread_worker_t *head   ;    // Task queue header pointer 
  bool    is_destroy   ;    // Whether the thread pool has been destroyed 
  int num;              // Number of threads 
  int rnum;         ;    // Running thread 
  int knum;         ;    // Killed threads 
  int queue_size       ;    // The size of the work queue  
  thread_info_t *threads   ;    // Thread group id, through pthread_join(thread_ids[0],NULL)  To execute the thread 
  pthread_t   display   ;    // Print the thread 
  pthread_t   destroy   ;    // A thread that periodically destroys threads id
  pthread_t   extend    ;
  float percent       ;    // The ratio of threads to tasks  rnum/queue_size
  int  init_num      ;
  pthread_cond_t  extend_ready     ;    // If you want to add threads 
}thread_pool_t;
 
/*------------------------- Function declaration ----------------------*/
/**
 * 1. Initialize the mutually exclusive variable 
 * 2. Initialize the wait variable 
 * 3. Creates a specified number of threads 
 */
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
 
 
/* Debug function */
void debug(char *message,int flag){
  if(flag)
    printf("%s\n",message);
}
 
void *display_thread(void *arg);
/**
 *  The add task consists of the following operations 
 * 1. Adds the task to the end of the queue 
 * 2. Notifies the waiting process to process the task  pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); // Net thread pool in the queue 1 The functions that need to be executed are called tasks 
 
/**
 *  Destroy thread pool , It includes the following sections 
 * 1. Notify all waiting processes  pthread_cond_broadcase
 * 2. Wait for all threads to finish executing 
 * 3. Destroy task list 
 * 4. Release lock, release condition 
 * 4. Destroys the thread pool object 
 */
 
 
 
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
 
 
thread_pool_t *thread_pool_create(int num){
  if(num<1){
    return NULL;
  }
  thread_pool_t *p;
  p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
  if(p==NULL)
    return NULL;
  p->init_num = num;
  /* Initializes mutually exclusive and conditional variables */
  pthread_mutex_init(&(p->queue_lock),NULL);
  pthread_cond_init(&(p->queue_ready),NULL);
 
  /* Set the number of threads */
  p->num  = num; 
  p->rnum = num;
  p->knum = 0;
 
  p->head = NULL;
  p->queue_size =0; 
  p->is_destroy = false;
 
   
  int i=0;
  thread_info_t *tmp=NULL;
  for(i=0;i<num;i++){
    /* Create a thread */
    tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
    if(tmp==NULL){
      free(p);
      return NULL;
    }else{
      tmp->next = p->threads;
      p->threads = tmp;
    }
    pthread_create(&(tmp->id),NULL,thread_excute_route,p);
    tmp->state = THREAD_STATE_RUN;
  }
 
  /* According to */
  pthread_create(&(p->display),NULL,display_thread,p);
  /* Detects whether dynamic threads are required */
  //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
  /* Dynamic destruction */
  pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
  return p;
}
 
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
  thread_pool_t *p= pool;
  thread_worker_t *worker=NULL,*member=NULL;
  worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
  int incr=0;
  if(worker==NULL){
    return -1;
  }
  worker->process = process;
  worker->arg   = arg;
  worker->next  = NULL;
  thread_pool_is_need_extend(pool);
  pthread_mutex_lock(&(p->queue_lock));
  member = p->head;
  if(member!=NULL){
    while(member->next!=NULL)
      member = member->next;
    member->next = worker;
  }else{
    p->head = worker;
  }
  p->queue_size ++;
  pthread_mutex_unlock(&(p->queue_lock));
  pthread_cond_signal(&(p->queue_ready));
  return 1;
}
 
 
void thread_pool_wait(thread_pool_t *pool){
  thread_info_t *thread;
  int i=0;
  for(i=0;i<pool->num;i++){
    thread = (thread_info_t*)(pool->threads+i);
    thread->state = THREAD_STATE_EXIT;
    pthread_join(thread->id,NULL);
  }
}
void thread_pool_destory(thread_pool_t *pool){
  thread_pool_t  *p   = pool;
  thread_worker_t *member = NULL;
 
  if(p->is_destroy)
    return ;
  p->is_destroy = true;
  pthread_cond_broadcast(&(p->queue_ready));
  thread_pool_wait(pool);
  free(p->threads);
  p->threads = NULL;
  /* Destroy task list */
  while(p->head){
    member = p->head;
    p->head = member->next;
    free(member);
  }
  /* Destroy thread list */
  thread_info_t *tmp=NULL;
  while(p->threads){
    tmp = p->threads;
    p->threads = tmp->next;
    free(tmp);
  }
 
  pthread_mutex_destroy(&(p->queue_lock));
  pthread_cond_destroy(&(p->queue_ready));
  return ;
}
/* Through the thread id, Find the corresponding thread */
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
  thread_info_t *thread=NULL;
  thread_info_t *p=pool->threads;
  while(p!=NULL){
    if(p->id==id)
      return p;
    p = p->next;
  }
  return NULL;
}
 
 
/* Each thread entry function */
void *thread_excute_route(void *arg){
  thread_worker_t *worker = NULL;
  thread_info_t  *thread = NULL; 
  thread_pool_t*  p = (thread_pool_t*)arg;
  //printf("thread %lld create success\n",pthread_self());
  while(1){
    pthread_mutex_lock(&(p->queue_lock));
 
    /* Gets the current thread's id*/
    pthread_t pthread_id = pthread_self();
    /* Set the current state */
    thread = get_thread_by_id(p,pthread_id);
 
    /* The thread pool is destroyed , And there are no tasks */
    if(p->is_destroy==true && p->queue_size ==0){
      pthread_mutex_unlock(&(p->queue_lock));
      thread->state = THREAD_STATE_EXIT;
      p->knum ++;
      p->rnum --;
      pthread_exit(NULL);
    }
    if(thread){
      thread->state = THREAD_STATE_TASK_WAITING; /* The thread is waiting for the task */
    }
    /* The thread pool is not destroyed , No mission comes 1 Straight wait */
    while(p->queue_size==0 && !p->is_destroy){
      pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
    }
    p->queue_size--;
    worker = p->head;
    p->head = worker->next;
    pthread_mutex_unlock(&(p->queue_lock));
     
 
    if(thread)
      thread->state = THREAD_STATE_TASK_PROCESSING; /* The thread is executing the task */
    (*(worker->process))(worker->arg);
    if(thread)
      thread->state = THREAD_STATE_TASK_FINISHED;  /* Mission accomplished */
    free(worker);
    worker = NULL;
  }
}
 
 
 
/* Expand the thread */
void *thread_pool_is_need_extend(void *arg){
  thread_pool_t *p = (thread_pool_t *)arg;
  thread_pool_t *pool = p;
  /* Determines whether additional threads are needed , The ultimate goal   thread : task =1:2*/
  if(p->queue_size>100){
    int incr =0;
    if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
      incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /* The computation needs to increase the number of threads */
      int i=0;
      thread_info_t *tmp=NULL;
      thread_pool_t *p = pool;
      pthread_mutex_lock(&pool->queue_lock);
      if(p->queue_size<100){
        pthread_mutex_unlock(&pool->queue_lock);
        return ;
      }
      for(i=0;i<incr;i++){
        /* Create a thread */
        tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
        if(tmp==NULL){
          continue;
        }else{
          tmp->next = p->threads;
          p->threads = tmp;
        }
        p->num ++;
        p->rnum ++;
        pthread_create(&(tmp->id),NULL,thread_excute_route,p);
        tmp->state = THREAD_STATE_RUN;
      }
      pthread_mutex_unlock(&pool->queue_lock);
    }
  }
  //pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/* Restores the initial number of threads */
void *thread_pool_is_need_recovery(void *arg){
  thread_pool_t *pool = (thread_pool_t *)arg;
  int i=0;
  thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
  /* If there is no task , The current thread is greater than the number of initialized threads */
  while(1){
    i=0;
    if(pool->queue_size==0 && pool->rnum > pool->init_num ){
      sleep(5);
      /*5s If I'm still in this state for a second , Destroy some threads */
      if(pool->queue_size==0 && pool->rnum > pool->init_num ){
        pthread_mutex_lock(&pool->queue_lock);
        tmp = pool->threads;
        while((pool->rnum != pool->init_num) && tmp){
          /* Find the idle thread */
          if(tmp->state != THREAD_STATE_TASK_PROCESSING){
            i++;
            if(prev)
              prev->next  = tmp->next;
            else
              pool->threads = tmp->next;
            pool->rnum --; /* Running thread minus 1*/
            pool->knum ++; /* Destroy the thread plus 1*/
            kill(tmp->id,SIGKILL); /* Destruction of the thread */
            p1 = tmp;
            tmp = tmp->next;
            free(p1);
            continue;
          }
          prev = tmp;
          tmp = tmp->next;
        }
        pthread_mutex_unlock(&pool->queue_lock);
        printf("5s There is no new task to destroy some threads , destroyed  %d  A thread \n",i);
      }
    }
    sleep(5);
  }
}
 
 
 
/* print 1 Some of the information */
void *display_thread(void *arg){
  thread_pool_t *p =(thread_pool_t *)arg;
  thread_info_t *thread = NULL;
  int i=0;
  while(1){
    printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum);  /* The total number of threads , Is running , Has been destroyed */
    thread = p->threads;
    while(thread){
      printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
      thread = thread->next;
    }
    sleep(5);
  }
}

I hope that this article is helpful for you to learn C language programming.


Related articles: