The principle and implementation of thread pool

  • 2020-04-02 01:36:59
  • OfStack

I. introduction to thread pool
Usually the way we use multithreading is to create a new thread when needed, execute a specific task in that thread, and then exit when the task is complete. This would suffice in a typical application, where we don't always need to create a large number of threads and destroy them after they perform a simple task.

But in some web, email, database, and other applications, such as ring tone, our application has to be prepared to handle a huge number of connection requests at any given time, and the tasks that these requests need to accomplish can be very simple, requiring very little processing time. At this point, our application might be in a state of constantly creating and destroying threads. While thread creation times are considerably shorter than process creation, the additional burden of thread creation and destruction on the processor is considerable if threads are created frequently and the processing time per thread is very short.

The purpose of the thread pool is to reduce the overhead of creating and destroying threads frequently. In general, thread pools are pre-created techniques that create a certain number of threads at the beginning of an application. Application in the process of running, when needed, from the thread pool composed of these threads can apply for allocation of a free thread, to perform a certain task, after the task is completed, not to destroy the thread, but it will be returned to the thread pool, by the thread pool management. If the preallocated threads in the thread pool are all allocated, but there is a new task request, the thread pool dynamically creates new threads to accommodate the request. Of course, it is possible that at some time the application does not need to perform many tasks, resulting in the threads in the thread pool being mostly idle. To save system resources, the thread pool needs to dynamically destroy some of the idle threads. Therefore, thread pools require a manager to dynamically maintain the number of threads in them.

Based on the above technique, the thread pool allocates the overhead of creating and destroying threads frequently to each specific task performed, and the more times a thread is executed, the less overhead it incurs to each task.

Of course, if the cost of thread creation and destruction is negligible compared to the cost of thread execution, then the thread pool is not necessary. For example, FTP, Telnet and other applications.

Two. The design of thread pool
Next, C language is used to implement a simple thread pool. In order to make this thread pool more convenient to use, some OO ideas are added to the C implementation. Unlike objective-c, it only USES struct to simulate C ++ classes, which is widely seen in the Linux kernel.

In this library, the main user-related interfaces are:


       typedef struct tp_work_desc_s tp_work_desc; //Some information that an application thread needs to perform a task
       typedef struct tp_work_s tp_work; //A task performed by a thread
       typedef struct tp_thread_info_s tp_thread_info; //Describes the id of each thread, whether it is idle, the task being performed, and so on
       typedef struct tp_thread_pool_s tp_thread_pool; //Interface information about thread pool operations
         //thread parm
       struct tp_work_desc_s{
                   ... 
        };
       //base thread struct
       struct tp_work_s{
                  //main process function. user interface
                  void (*process_job)(tp_work *this, tp_work_desc *job);
        };
        tp_thread_pool *creat_thread_pool(int min_num, int max_num);

Tp_work_desc_s represents information that an application thread needs to perform a task and is passed to each thread as a thread parameter, depending on the application, requiring the content of the user-defined structure. Tp_work_s is what we want the thread to do. When we apply to assign a new thread, we first specify the two structures, what the thread does and what additional information is required to complete the task. The interface function creat_thread_pool is used to create an instance of a thread pool by specifying the minimum number of threads min_num and the maximum number of threads max_num that the thread pool instance can hold. Minimum number of threads which thread pool create number of threads in the process of the creation, when the size of the number of directly influence the thread pool can also play a role in the effect, if the specified is too small, the pre created thread in the thread pool will be finished soon and need to create a new thread to adapt to the request, if the specified is too large, there may be a large number of idle threads will be. We need to specify it according to the actual needs of our application. The structure of the thread pool is described as follows:

        //main thread pool struct
        struct tp_thread_pool_s{
             TPBOOL (*init)(tp_thread_pool *this);
             void (*close)(tp_thread_pool *this);
             void (*process_job)(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);
             int  (*get_thread_by_id)(tp_thread_pool *this, int id);
             TPBOOL (*add_thread)(tp_thread_pool *this);
             TPBOOL (*delete_thread)(tp_thread_pool *this);
              int (*get_tp_status)(tp_thread_pool *this); 
              int min_th_num;                //min thread number in the pool
              int cur_th_num;                 //current thread number in the pool
              int max_th_num;         //max thread number in the pool
              pthread_mutex_t tp_lock;
              pthread_t manage_thread_id;  //manage thread id num
              tp_thread_info *thread_info;   //work thread relative thread info
};
          structure tp_thread_info_s Describes individual threads id , whether idle, the task being performed, and so on, the user does not need to care about it. 
         //thread info
         struct tp_thread_info_s{
              pthread_t          thread_id;         //thread id num
             TPBOOL                   is_busy;    //thread status:true-busy;flase-idle
             pthread_cond_t          thread_cond;
             pthread_mutex_t               thread_lock;
             tp_work                      *th_work;
             tp_work_desc            *th_job;
         };

tp_thread_pool_s The structure contains interfaces and variables for thread pool operations. After returning a thread pool instance using creat_thread_pool, it is first initialized using the explicit init interface. During this initialization, the thread pool pre-creates a minimum number of threads that are blocked and do not consume CPU, but take up a certain amount of memory. Also init will create a thread pool management thread, the thread will be in the operation of the thread pool cycle has been performed, it will regularly check analysis the state of the thread pool, if the idle thread in the thread pool too much, it will remove part of the idle thread, of course it doesn't make all the number of threads is less than the specified minimum number of threads.

After the thread pool has been created and initialized, we can specify the tp_work_desc_s and tp_work_s structures and execute them using the thread pool's process_job interface. That's all we need to know to use this thread pool. If you no longer need a thread pool, you can destroy it using the close interface.

Three. Implementation code
Thread-pool.h(header file) :


#include <stdio.h>   
#include <stdlib.h>   
#include <sys/types.h>   
#include <pthread.h>   
#include <signal.h>   

#ifndef TPBOOL   
typedef int TPBOOL;  
#endif   

#ifndef TRUE   
#define TRUE 1   
#endif   

#ifndef FALSE   
#define FALSE 0   
#endif   

#define BUSY_THRESHOLD 0.5  //(busy thread)/(all thread threshold)   
#define MANAGE_INTERVAL 5   //tp manage thread sleep interval   

typedef struct tp_work_desc_s tp_work_desc;  
typedef struct tp_work_s tp_work;  
typedef struct tp_thread_info_s tp_thread_info;  
typedef struct tp_thread_pool_s tp_thread_pool;  

//thread parm   
struct tp_work_desc_s{  
    char *inum; //call in   
    char *onum; //call out   
    int chnum;  //channel num   
};  

//base thread struct   
struct tp_work_s{  
    //main process function. user interface   
    void (*process_job)(tp_work *this, tp_work_desc *job);  
};  

//thread info   
struct tp_thread_info_s{  
    pthread_t       thread_id;  //thread id num   
    TPBOOL          is_busy;    //thread status:true-busy;flase-idle   
    pthread_cond_t          thread_cond;      
    pthread_mutex_t     thread_lock;  
    tp_work         *th_work;  
    tp_work_desc        *th_job;  
};  

//main thread pool struct   
struct tp_thread_pool_s{  
    TPBOOL (*init)(tp_thread_pool *this);  
    void (*close)(tp_thread_pool *this);  
    void (*process_job)(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);  
    int  (*get_thread_by_id)(tp_thread_pool *this, int id);  
    TPBOOL (*add_thread)(tp_thread_pool *this);  
    TPBOOL (*delete_thread)(tp_thread_pool *this);  
    int (*get_tp_status)(tp_thread_pool *this);  

    int min_th_num;     //min thread number in the pool   
    int cur_th_num;     //current thread number in the pool   
    int max_th_num;         //max thread number in the pool   
    pthread_mutex_t tp_lock;  
    pthread_t manage_thread_id; //manage thread id num   
    tp_thread_info *thread_info;    //work thread relative thread info   
};  

tp_thread_pool *creat_thread_pool(int min_num, int max_num); 

Thread-pool.c(implementation file) :

#include "thread-pool.h"   

static void *tp_work_thread(void *pthread);  
static void *tp_manage_thread(void *pthread);  

static TPBOOL tp_init(tp_thread_pool *this);  
static void tp_close(tp_thread_pool *this);  
static void tp_process_job(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);  
static int  tp_get_thread_by_id(tp_thread_pool *this, int id);  
static TPBOOL tp_add_thread(tp_thread_pool *this);  
static TPBOOL tp_delete_thread(tp_thread_pool *this);  
static int  tp_get_tp_status(tp_thread_pool *this);  

  
tp_thread_pool *creat_thread_pool(int min_num, int max_num){  
    tp_thread_pool *this;  
    this = (tp_thread_pool*)malloc(sizeof(tp_thread_pool));   

    memset(this, 0, sizeof(tp_thread_pool));  

    //init member function ponter   
    this->init = tp_init;  
    this->close = tp_close;  
    this->process_job = tp_process_job;  
    this->get_thread_by_id = tp_get_thread_by_id;  
    this->add_thread = tp_add_thread;  
    this->delete_thread = tp_delete_thread;  
    this->get_tp_status = tp_get_tp_status;  

    //init member var   
    this->min_th_num = min_num;  
    this->cur_th_num = this->min_th_num;  
    this->max_th_num = max_num;  
    pthread_mutex_init(&this->tp_lock, NULL);  

    //malloc mem for num thread info struct   
    if(NULL != this->thread_info)  
        free(this->thread_info);  
    this->thread_info = (tp_thread_info*)malloc(sizeof(tp_thread_info)*this->max_th_num);  

    return this;  
}  

  
  
TPBOOL tp_init(tp_thread_pool *this){  
    int i;  
    int err;  

    //creat work thread and init work thread info   
    for(i=0;i<this->min_th_num;i++){  
        pthread_cond_init(&this->thread_info[i].thread_cond, NULL);  
        pthread_mutex_init(&this->thread_info[i].thread_lock, NULL);  

        err = pthread_create(&this->thread_info[i].thread_id, NULL, tp_work_thread, this);  
        if(0 != err){  
            printf("tp_init: creat work thread failedn");  
            return FALSE;  
        }  
        printf("tp_init: creat work thread %dn", this->thread_info[i].thread_id);  
    }  

    //creat manage thread   
    err = pthread_create(&this->manage_thread_id, NULL, tp_manage_thread, this);  
    if(0 != err){  
        printf("tp_init: creat manage thread failedn");  
        return FALSE;  
    }  
    printf("tp_init: creat manage thread %dn", this->manage_thread_id);  

    return TRUE;  
}  

  
void tp_close(tp_thread_pool *this){  
    int i;  

    //close work thread   
    for(i=0;i<this->cur_th_num;i++){  
        kill(this->thread_info[i].thread_id, SIGKILL);  
        pthread_mutex_destroy(&this->thread_info[i].thread_lock);  
        pthread_cond_destroy(&this->thread_info[i].thread_cond);  
        printf("tp_close: kill work thread %dn", this->thread_info[i].thread_id);  
    }  

    //close manage thread   
    kill(this->manage_thread_id, SIGKILL);  
    pthread_mutex_destroy(&this->tp_lock);  
    printf("tp_close: kill manage thread %dn", this->manage_thread_id);  

    //free thread struct   
    free(this->thread_info);  
}  

  
void tp_process_job(tp_thread_pool *this, tp_work *worker, tp_work_desc *job){  
    int i;  
    int tmpid;  

    //fill this->thread_info's relative work key   
    for(i=0;i<this->cur_th_num;i++){  
        pthread_mutex_lock(&this->thread_info[i].thread_lock);  
        if(!this->thread_info[i].is_busy){  
            printf("tp_process_job: %d thread idle, thread id is %dn", i, this->thread_info[i].thread_id);  
            //thread state be set busy before work   
            this->thread_info[i].is_busy = TRUE;  
            pthread_mutex_unlock(&this->thread_info[i].thread_lock);  

            this->thread_info[i].th_work = worker;  
            this->thread_info[i].th_job = job;  

            printf("tp_process_job: informing idle working thread %d, thread id is %dn", i, this->thread_info[i].thread_id);  
            pthread_cond_signal(&this->thread_info[i].thread_cond);  

            return;  
        }  
        else   
            pthread_mutex_unlock(&this->thread_info[i].thread_lock);       
    }//end of for   

    //if all current thread are busy, new thread is created here   
    pthread_mutex_lock(&this->tp_lock);  
    if( this->add_thread(this) ){  
        i = this->cur_th_num - 1;  
        tmpid = this->thread_info[i].thread_id;  
        this->thread_info[i].th_work = worker;  
        this->thread_info[i].th_job = job;  
    }  
    pthread_mutex_unlock(&this->tp_lock);  

    //send cond to work thread   
    printf("tp_process_job: informing idle working thread %d, thread id is %dn", i, this->thread_info[i].thread_id);  
    pthread_cond_signal(&this->thread_info[i].thread_cond);  
    return;   
}  

  
int tp_get_thread_by_id(tp_thread_pool *this, int id){  
    int i;  

    for(i=0;i<this->cur_th_num;i++){  
        if(id == this->thread_info[i].thread_id)  
            return i;  
    }  

    return -1;  
}  

  
static TPBOOL tp_add_thread(tp_thread_pool *this){  
    int err;  
    tp_thread_info *new_thread;  

    if( this->max_th_num <= this->cur_th_num )  
        return FALSE;  

    //malloc new thread info struct   
    new_thread = &this->thread_info[this->cur_th_num];  

    //init new thread's cond & mutex   
    pthread_cond_init(&new_thread->thread_cond, NULL);  
    pthread_mutex_init(&new_thread->thread_lock, NULL);  

    //init status is busy   
    new_thread->is_busy = TRUE;  

    //add current thread number in the pool.   
    this->cur_th_num++;  

    err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, this);  
    if(0 != err){  
        free(new_thread);  
        return FALSE;  
    }  
    printf("tp_add_thread: creat work thread %dn", this->thread_info[this->cur_th_num-1].thread_id);  

    return TRUE;  
}  

  
static TPBOOL tp_delete_thread(tp_thread_pool *this){  
    //current thread num can't < min thread num   
    if(this->cur_th_num <= this->min_th_num) return FALSE;  

    //if last thread is busy, do nothing   
    if(this->thread_info[this->cur_th_num-1].is_busy) return FALSE;  

    //kill the idle thread and free info struct   
    kill(this->thread_info[this->cur_th_num-1].thread_id, SIGKILL);  
    pthread_mutex_destroy(&this->thread_info[this->cur_th_num-1].thread_lock);  
    pthread_cond_destroy(&this->thread_info[this->cur_th_num-1].thread_cond);  

    //after deleting idle thread, current thread num -1   
    this->cur_th_num--;  

    return TRUE;  
}  

  
static int  tp_get_tp_status(tp_thread_pool *this){  
    float busy_num = 0.0;  
    int i;  

    //get busy thread number   
    for(i=0;i<this->cur_th_num;i++){  
        if(this->thread_info[i].is_busy)  
            busy_num++;  
    }  

    //0.2? or other num?   
    if(busy_num/(this->cur_th_num) < BUSY_THRESHOLD)  
        return 0;//idle status   
    else  
        return 1;//busy or normal status       
}  

  
static void *tp_work_thread(void *pthread){  
    pthread_t curid;//current thread id   
    int nseq;//current thread seq in the this->thread_info array   
    tp_thread_pool *this = (tp_thread_pool*)pthread;//main thread pool struct instance   

    //get current thread id   
    curid = pthread_self();  

    //get current thread's seq in the thread info struct array.   
    nseq = this->get_thread_by_id(this, curid);  
    if(nseq < 0)  
        return;  
    printf("entering working thread %d, thread id is %dn", nseq, curid);  

    //wait cond for processing real job.   
    while( TRUE ){  
        pthread_mutex_lock(&this->thread_info[nseq].thread_lock);  
        pthread_cond_wait(&this->thread_info[nseq].thread_cond, &this->thread_info[nseq].thread_lock);  
        pthread_mutex_unlock(&this->thread_info[nseq].thread_lock);        

        printf("%d thread do work!n", pthread_self());  

        tp_work *work = this->thread_info[nseq].th_work;  
        tp_work_desc *job = this->thread_info[nseq].th_job;  

        //process   
        work->process_job(work, job);  

        //thread state be set idle after work   
        pthread_mutex_lock(&this->thread_info[nseq].thread_lock);          
        this->thread_info[nseq].is_busy = FALSE;  
        pthread_mutex_unlock(&this->thread_info[nseq].thread_lock);  

        printf("%d do work overn", pthread_self());  
    }     
}  

  
static void *tp_manage_thread(void *pthread){  
    tp_thread_pool *this = (tp_thread_pool*)pthread;//main thread pool struct instance   

    //1?   
    sleep(MANAGE_INTERVAL);  

    do{  
        if( this->get_tp_status(this) == 0 ){  
            do{  
                if( !this->delete_thread(this) )  
                    break;  
            }while(TRUE);  
        }//end for if   

        //1?   
        sleep(MANAGE_INTERVAL);  
    }while(TRUE);  
}  

Four. Database connection pool introduction      
Database connections are a critical, limited, and expensive resource, especially in multi-user web applications.

A database connection object corresponds to a physical database connection. Each operation opens a physical connection and closes the connection after use, resulting in low performance of the system. The solution to database connection pooling is to create enough database connections at application startup to form a connection pool (simply put: a "pool" with many semi-finished database connection objects) that the application can dynamically request, use, and release connections in the pool. For concurrent requests that are more than the number of connections in the connection pool, you should queue up in the request queue. And applications can dynamically increase or decrease the number of connections in the pool based on the utilization of connections in the pool.

Connection pooling technology reuses memory resources as much as possible, greatly saves memory, improves server service efficiency, and supports more customer service. By using connection pooling, we can greatly improve the efficiency of the program, and at the same time, we can monitor the number of database connections, usage, and so on through its own management mechanism.

1)   The minimum number of connections is the number of database connections that are maintained in the connection pool, so if the application USES only a small amount of database connections, a large number of database connection resources will be wasted.

2)   The maximum number of connections is the maximum number of connections that the connection pool can request. If the database connection request exceeds this number, subsequent database connection requests are added to the wait queue, which affects subsequent database operations.


Related articles: