Description of message queues based on condition variables

  • 2020-04-01 21:35:13
  • OfStack

Conditional variables are another mechanism for synchronization before a thread. Conditional variables provide a place for multithreading to gather. When a condition variable is used with a mutex, threads are allowed to wait in an uncontactable manner for a particular condition to occur. This greatly reduces thread scheduling and thread waiting caused by lock contention.

        Message queue is an unavoidable obstacle in the process of server-side development. Previously, I have implemented a message queue based on mutex and three queues, and the performance is very good. Other gardeners in the blog garden have implemented a lot of message queues based on ring queues and lock-free queues. This is probably also a reference to Java's blockingqueue, which was briefly described in the previous blog!! Queue based on three buffers, although the maximum solution to the thread race, but in the few players, messages are small, need to add some buff to fill the data, which is probably a drawback!

        What objects are message queues primarily used for during server development?

        1: I think it is probably the interaction between the communication layer and the logical layer. The network data received by the communication layer is then passed to the logical layer through the message queue after the packet is verified. The logical layer then passes the processing result packet to the communication layer!

        2: separation of logic thread and database IO thread; The database IO thread is responsible for reading and writing updates to the database, and the operation of the logic layer to the database is encapsulated into a message to request the database IO thread. After the database IO thread is processed, it is handed back to the logic layer.

        3: log; The processing mode is similar to mode 2. But the log probably doesn't need to be returned!

Source code:

BlockingQueue. H file




#ifndef BLOCKINGQUEUE_H_ 
#define BLOCKINGQUEUE_H_ 

#include <queue> 
#include <pthread.h> 

typedef void* CommonItem; 

class BlockingQueue 
{ 
public: 
    BlockingQueue(); 

    virtual ~BlockingQueue(); 

    int peek(CommonItem &item); 

    int append(CommonItem item); 

private: 

    pthread_mutex_t _mutex; 

    pthread_cond_t _cond; 

    std::queue<CommonItem> _read_queue; 

    std::queue<CommonItem> _write_queue; 

}; 

  
#endif 

Blockingqueue.cpp file code



#include "BlockingQueue.h" 

BlockingQueue::BlockingQueue() 
{ 
    pthread_mutex_init(&this->_mutex,NULL); 
    pthread_cond_init(&this->_cond,NULL); 
} 

BlockingQueue::~BlockingQueue() 
{ 
    pthread_mutex_destroy(&this->_mutex); 
    pthread_cond_destroy(&this->_cond); 
} 

int BlockingQueue::peek(CommonItem &item) 
{ 

    if( !this->_read_queue.empty() ) 
    { 
        item = this->_read_queue.front(); 
        this->_read_queue.pop(); 
    } 
    else
    { 
        pthread_mutex_lock(&this->_mutex); 

        while(this->_write_queue.empty()) 
        { 
            pthread_cond_wait(&this->_cond,&this->_mutex); 
        } 

        while(!this->_write_queue.empty()) 
        { 
            this->_read_queue.push(this->_write_queue.front()); 
            this->_write_queue.pop(); 
        } 

        pthread_mutex_unlock(&this->_mutex); 
    } 

  
    return 0; 
} 

int BlockingQueue::append(CommonItem item) 
{ 
    pthread_mutex_lock(&this->_mutex); 
    this->_write_queue.push(item); 
    pthread_cond_signal(&this->_cond); 
    pthread_mutex_unlock(&this->_mutex); 
    return 0; 
}

Test code:

BlockingQueue _queue; 

void* process(void* arg) 
{ 

    int i=0; 
    while(true) 
    { 
        int *j = new int(); 
        *j = i; 
        _queue.append((void *)j); 
        i ++; 
    } 
    return NULL; 
} 

int main(int argc,char** argv) 
{ 
    pthread_t pid; 
    pthread_create(&pid,0,process,0); 

    long long int start = get_os_system_time(); 
    int i = 0; 
    while(true) 
    { 
        int* j = NULL; 
        _queue.peek((void* &)j); 

        i ++; 

        if(j != NULL && (*j) == 100000) 
        { 
            long long int end = get_os_system_time(); 
            printf("consume %dn",end - start); 
            break; 
        } 
    } 

    return 0; 
}


Related articles: