The C language implements a thread pool that supports dynamic expansion and destruction
- 2020-05-07 20:11:14
- OfStack
This article introduces the implementation of C language thread pool, support dynamic expansion and destruction, to share with you for your reference, the specific content is as follows
Realize the function
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>
/* A thread's task queue is formed by , Functions and arguments , Tasks are managed by linked lists */
typedef struct thread_worker_s{
void *(*process)(void *arg); // The processing function
void *arg; // parameter
struct thread_worker_s *next;
}thread_worker_t;
#define bool int
#define true 1
#define false 0
/* State description of each thread in the thread pool */
#define THREAD_STATE_RUN 0
#define THREAD_STATE_TASK_WAITING 1
#define THREAD_STATE_TASK_PROCESSING 2
#define THREAD_STATE_TASK_FINISHED 3
#define THREAD_STATE_EXIT 4
typedef struct thread_info_s{
pthread_t id;
int state;
struct thread_info_s *next;
}thread_info_t;
static char* thread_state_map[] ={" create "," Wait for the task "," In the processing "," Processing is complete "," Have withdrawn from the "};
/* Only when the thread is compressed 0,1,2,4 Threads of state can be destroyed */
/* Thread pool manager */
#define THREAD_BUSY_PERCENT 0.5 /* thread : task = 1:2 The smaller the value, the more tasks , Increase the thread */
#define THREAD_IDLE_PERCENT 2 /* thread : task = 2:1 Value is greater than 1, More threads than tasks , Destroy some threads */
typedef struct thread_pool_s{
pthread_mutex_t queue_lock ; // Queue mutex , Locks are required when queue modifications are involved
pthread_cond_t queue_ready; // Queue conditional lock , The queue satisfies a condition, triggering the thread waiting for that condition to continue execution , Let's say the queue is full , The queue is empty
thread_worker_t *head ; // Task queue header pointer
bool is_destroy ; // Whether the thread pool has been destroyed
int num; // Number of threads
int rnum; ; // Running thread
int knum; ; // Killed threads
int queue_size ; // The size of the work queue
thread_info_t *threads ; // Thread group id, through pthread_join(thread_ids[0],NULL) To execute the thread
pthread_t display ; // Print the thread
pthread_t destroy ; // A thread that periodically destroys threads id
pthread_t extend ;
float percent ; // The ratio of threads to tasks rnum/queue_size
int init_num ;
pthread_cond_t extend_ready ; // If you want to add threads
}thread_pool_t;
/*------------------------- Function declaration ----------------------*/
/**
* 1. Initialize the mutually exclusive variable
* 2. Initialize the wait variable
* 3. Creates a specified number of threads
*/
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
/* Debug function */
void debug(char *message,int flag){
if(flag)
printf("%s\n",message);
}
void *display_thread(void *arg);
/**
* The add task consists of the following operations
* 1. Adds the task to the end of the queue
* 2. Notifies the waiting process to process the task pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); // Net thread pool in the queue 1 The functions that need to be executed are called tasks
/**
* Destroy thread pool , It includes the following sections
* 1. Notify all waiting processes pthread_cond_broadcase
* 2. Wait for all threads to finish executing
* 3. Destroy task list
* 4. Release lock, release condition
* 4. Destroys the thread pool object
*/
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
thread_pool_t *thread_pool_create(int num){
if(num<1){
return NULL;
}
thread_pool_t *p;
p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
if(p==NULL)
return NULL;
p->init_num = num;
/* Initializes mutually exclusive and conditional variables */
pthread_mutex_init(&(p->queue_lock),NULL);
pthread_cond_init(&(p->queue_ready),NULL);
/* Set the number of threads */
p->num = num;
p->rnum = num;
p->knum = 0;
p->head = NULL;
p->queue_size =0;
p->is_destroy = false;
int i=0;
thread_info_t *tmp=NULL;
for(i=0;i<num;i++){
/* Create a thread */
tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
if(tmp==NULL){
free(p);
return NULL;
}else{
tmp->next = p->threads;
p->threads = tmp;
}
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state = THREAD_STATE_RUN;
}
/* According to */
pthread_create(&(p->display),NULL,display_thread,p);
/* Detects whether dynamic threads are required */
//pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
/* Dynamic destruction */
pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
return p;
}
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
thread_pool_t *p= pool;
thread_worker_t *worker=NULL,*member=NULL;
worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
int incr=0;
if(worker==NULL){
return -1;
}
worker->process = process;
worker->arg = arg;
worker->next = NULL;
thread_pool_is_need_extend(pool);
pthread_mutex_lock(&(p->queue_lock));
member = p->head;
if(member!=NULL){
while(member->next!=NULL)
member = member->next;
member->next = worker;
}else{
p->head = worker;
}
p->queue_size ++;
pthread_mutex_unlock(&(p->queue_lock));
pthread_cond_signal(&(p->queue_ready));
return 1;
}
void thread_pool_wait(thread_pool_t *pool){
thread_info_t *thread;
int i=0;
for(i=0;i<pool->num;i++){
thread = (thread_info_t*)(pool->threads+i);
thread->state = THREAD_STATE_EXIT;
pthread_join(thread->id,NULL);
}
}
void thread_pool_destory(thread_pool_t *pool){
thread_pool_t *p = pool;
thread_worker_t *member = NULL;
if(p->is_destroy)
return ;
p->is_destroy = true;
pthread_cond_broadcast(&(p->queue_ready));
thread_pool_wait(pool);
free(p->threads);
p->threads = NULL;
/* Destroy task list */
while(p->head){
member = p->head;
p->head = member->next;
free(member);
}
/* Destroy thread list */
thread_info_t *tmp=NULL;
while(p->threads){
tmp = p->threads;
p->threads = tmp->next;
free(tmp);
}
pthread_mutex_destroy(&(p->queue_lock));
pthread_cond_destroy(&(p->queue_ready));
return ;
}
/* Through the thread id, Find the corresponding thread */
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
thread_info_t *thread=NULL;
thread_info_t *p=pool->threads;
while(p!=NULL){
if(p->id==id)
return p;
p = p->next;
}
return NULL;
}
/* Each thread entry function */
void *thread_excute_route(void *arg){
thread_worker_t *worker = NULL;
thread_info_t *thread = NULL;
thread_pool_t* p = (thread_pool_t*)arg;
//printf("thread %lld create success\n",pthread_self());
while(1){
pthread_mutex_lock(&(p->queue_lock));
/* Gets the current thread's id*/
pthread_t pthread_id = pthread_self();
/* Set the current state */
thread = get_thread_by_id(p,pthread_id);
/* The thread pool is destroyed , And there are no tasks */
if(p->is_destroy==true && p->queue_size ==0){
pthread_mutex_unlock(&(p->queue_lock));
thread->state = THREAD_STATE_EXIT;
p->knum ++;
p->rnum --;
pthread_exit(NULL);
}
if(thread){
thread->state = THREAD_STATE_TASK_WAITING; /* The thread is waiting for the task */
}
/* The thread pool is not destroyed , No mission comes 1 Straight wait */
while(p->queue_size==0 && !p->is_destroy){
pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
}
p->queue_size--;
worker = p->head;
p->head = worker->next;
pthread_mutex_unlock(&(p->queue_lock));
if(thread)
thread->state = THREAD_STATE_TASK_PROCESSING; /* The thread is executing the task */
(*(worker->process))(worker->arg);
if(thread)
thread->state = THREAD_STATE_TASK_FINISHED; /* Mission accomplished */
free(worker);
worker = NULL;
}
}
/* Expand the thread */
void *thread_pool_is_need_extend(void *arg){
thread_pool_t *p = (thread_pool_t *)arg;
thread_pool_t *pool = p;
/* Determines whether additional threads are needed , The ultimate goal thread : task =1:2*/
if(p->queue_size>100){
int incr =0;
if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /* The computation needs to increase the number of threads */
int i=0;
thread_info_t *tmp=NULL;
thread_pool_t *p = pool;
pthread_mutex_lock(&pool->queue_lock);
if(p->queue_size<100){
pthread_mutex_unlock(&pool->queue_lock);
return ;
}
for(i=0;i<incr;i++){
/* Create a thread */
tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
if(tmp==NULL){
continue;
}else{
tmp->next = p->threads;
p->threads = tmp;
}
p->num ++;
p->rnum ++;
pthread_create(&(tmp->id),NULL,thread_excute_route,p);
tmp->state = THREAD_STATE_RUN;
}
pthread_mutex_unlock(&pool->queue_lock);
}
}
//pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/* Restores the initial number of threads */
void *thread_pool_is_need_recovery(void *arg){
thread_pool_t *pool = (thread_pool_t *)arg;
int i=0;
thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
/* If there is no task , The current thread is greater than the number of initialized threads */
while(1){
i=0;
if(pool->queue_size==0 && pool->rnum > pool->init_num ){
sleep(5);
/*5s If I'm still in this state for a second , Destroy some threads */
if(pool->queue_size==0 && pool->rnum > pool->init_num ){
pthread_mutex_lock(&pool->queue_lock);
tmp = pool->threads;
while((pool->rnum != pool->init_num) && tmp){
/* Find the idle thread */
if(tmp->state != THREAD_STATE_TASK_PROCESSING){
i++;
if(prev)
prev->next = tmp->next;
else
pool->threads = tmp->next;
pool->rnum --; /* Running thread minus 1*/
pool->knum ++; /* Destroy the thread plus 1*/
kill(tmp->id,SIGKILL); /* Destruction of the thread */
p1 = tmp;
tmp = tmp->next;
free(p1);
continue;
}
prev = tmp;
tmp = tmp->next;
}
pthread_mutex_unlock(&pool->queue_lock);
printf("5s There is no new task to destroy some threads , destroyed %d A thread \n",i);
}
}
sleep(5);
}
}
/* print 1 Some of the information */
void *display_thread(void *arg){
thread_pool_t *p =(thread_pool_t *)arg;
thread_info_t *thread = NULL;
int i=0;
while(1){
printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum); /* The total number of threads , Is running , Has been destroyed */
thread = p->threads;
while(thread){
printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
thread = thread->next;
}
sleep(5);
}
}
I hope that this article is helpful for you to learn C language programming.