C++ Multithreading implementation based on message queue example code

  • 2020-06-15 09:50:18
  • OfStack

preface

The key to implementing a message queue is to consider synchronization between different threads accessing the message queue. This realization involves several knowledge points

std: : lock_guard is introduced

std::lock_gurad is the template class defined in C++11. The definition is as follows:


template <class Mutex> class lock_guard;

lock_guard object is usually used to manage a lock (Lock) object, so it is related to Mutex RAII, which makes it easy for the thread to lock the mutex. In other words, during the declaration period of an lock_guard object, the lock object managed by lock_guard object will remain locked. At the end of the lock_guard lifecycle, the lock object it manages is unlocked (note: smart Pointers like shared_ptr manage dynamically allocated memory resources).

The template parameter Mutex represents a mutex type, such as std::mutex, which should be a basic BasicLockable type. The standard library defines several basic BasicLockable types: std::mutex, std::recursive_mutex, std::timed_mutex, std::recursive_timed_mutex and std::unique_lock

std: : unique_lock is introduced

The biggest disadvantage of lock_guard is that it does not provide enough flexibility for programmers. Therefore, the C++11 standard defines another class related to Mutex RAII, unique_lock, which is similar to lock_guard and is very convenient for threads to lock the mutex, but it provides better lock and unlock control.
As the name implies, an unique_lock object manages the locking and unlocking of an mutex object in the form of exclusive ownership (unique owership). Exclusive ownership means that no other unique_lock object has the ownership of an mutex object.

The newly created unique_lock object manages the Mutex object m and attempts to call m.lock () to lock the Mutex object. The current thread will block if another unique_lock object manages the Mutex object m.

std: : condition is introduced

When an wait function of the std::condition_variable object is called, it USES std::unique_lock(via std::mutex) to lock the current thread. The current thread is blocked until another thread calls the notification function on the same std::condition_variable object to wake up the current thread.

std::condition_variable provides two wait() functions. The current thread will block after calling wait() (at this point the current thread should have the lock (mutex), let's say it has the lock) until another thread calls notify_* and wakes up the current thread.

When a thread is blocked, this function automatically calls lck.unlock () to release the lock, allowing other threads blocked on the lock race to continue. In addition, once the current thread is notified (notified, usually when another thread calls notify_* to wake up the current thread), the wait() function also automatically calls lck.lock (), making lck the same state as when the wait function was called.

In the second case, when Predicate is set, calling wait() will block the current thread only if pred is false, and will be unblocked only if pred is true when notified by other threads. Therefore, the second case is similar to the following code:


while (!pred()) wait(lck);

std: : function is introduced

Using std::function you can unify ordinary functions, lambda expressions and function object classes. They are not of the same type, however with the function template class, you can convert an object of the same type (function object) into an vector or other container for easy callbacks.

Code implementation:


#pragma once

#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H

#include <queue>
#include <mutex>
#include <condition_variable>

template<class Type>
class CMessageQueue
{
public:
 CMessageQueue& operator = (const CMessageQueue&) = delete;
 CMessageQueue(const CMessageQueue& mq) = delete;


 CMessageQueue() :_queue(), _mutex(), _condition(){}
 virtual ~CMessageQueue(){}

 void Push(Type msg){
  std::lock_guard <std::mutex> lock(_mutex);
  _queue.push(msg);
   // When using blocking mode to get a message from a message queue, by condition Alert the waiting thread when a new message arrives 
  _condition.notify_one();
 }
  //blocked Defines the access mode to be either synchronously blocked or non-blocking 
 bool Pop(Type& msg, bool isBlocked = true){
  if (isBlocked)
  {
   std::unique_lock <std::mutex> lock(_mutex);
   while (_queue.empty())
   {
    _condition.wait(lock);
    
   }
   // Note that this 1 Segment must be placed in if In the statement because lock The domain of life is only in if Inside the curly braces 
   msg = std::move(_queue.front());
   _queue.pop();
   return true;
   
  }
  else
  {
   std::lock_guard<std::mutex> lock(_mutex);
   if (_queue.empty())
    return false;


   msg = std::move(_queue.front());
   _queue.pop();
   return true;
  }

 }

 int32_t Size(){
  std::lock_guard<std::mutex> lock(_mutex);
  return _queue.size();
 }

 bool Empty(){
  std::lock_guard<std::mutex> lock(_mutex);
  return _queue.empty();
 }
private:
 std::queue<Type> _queue;// A queue where messages are stored 
 mutable std::mutex _mutex;// Synchronization lock 
 std::condition_variable _condition;// Achieve synchronous retrieval of messages 
};

#endif//MESSAGE_QUEUE_H

A thread pool can either construct the thread directly in the constructor and pass in the callback function, or write an Run function to display the call. Here, we choose the second one and compare:

Do a loop outside the handler function to receive the message and call hanlder to process it when the message arrives. This implementation does encapsulation on top, but calls functions frequently between threads. This design cannot reuse 1 resources. For example, when doing database operations in handler, frequent connections and disconnections are required, which can be achieved by defining two virtual functions, Prehandler and AfterHandler.
!!!!!!!!! Calling a virtual function in a constructor does not actually call the implementation of a subclass!!
Although it is possible to make real calls to virtual functions, programmers should write virtual functions for dynamic linking. Call a virtual function in a constructor whose entry address is statically determined at compile time and does not implement a virtual call Write an Run function and place the first part of the implementation in the run function to display the call.
Effective C++ Clause 9: Never call a virtual function in a constructor or destructor

#ifndef THREAD_POOL_H
#define THREAD_POOL_H


#include <functional>
#include <vector>
#include <thread>

#include "MessageQueue.h"

#define MIN_THREADS 1

template<class Type>
class CThreadPool
{
 CThreadPool& operator = (const CThreadPool&) = delete;
 CThreadPool(const CThreadPool& other) = delete;

public:
 CThreadPool(int32_t threads, 
  std::function<void(Type& record, CThreadPool<Type>* pSub)> handler);
 virtual ~CThreadPool();

 void Run();
 virtual void PreHandler(){}
 virtual void AfterHandler(){}
 void Submit(Type record);


private:
 bool _shutdown;
 int32_t _threads;
 std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler;
 std::vector<std::thread> _workers;
 CMessageQueue<Type> _tasks;

};



template<class Type>
CThreadPool<Type>::CThreadPool(int32_t threads, 
 std::function<void(Type& record, CThreadPool<Type>* pSub)> handler)
 :_shutdown(false),
 _threads(threads),
 _handler(handler),
 _workers(),
 _tasks()
{

 // The first 1 Note that the virtual function call here is not correct 
 /*if (_threads < MIN_THREADS)
  _threads = MIN_THREADS;
 for (int32_t i = 0; i < _threads; i++)
 {
  
  _workers.emplace_back(
   [this]{
   PreHandler();
   while (!_shutdown){
    Type record;
    _tasks.Pop(record, true);
    _handler(record, this);
   }
   AfterHandler();
  }
  );
 }*/

}

// The first 2 Species realization scheme 
template<class Type>
void CThreadPool<Type>::Run()
{
 if (_threads < MIN_THREADS)
  _threads = MIN_THREADS;
 for (int32_t i = 0; i < _threads; i++)
 {
  _workers.emplace_back(
   [this]{
   PreHandler();
   while (!_shutdown){
    Type record;
    _tasks.Pop(record, true);
    _handler(record, this);
   }
   AfterHandler();
  }
  );
 }
}




template<class Type>
CThreadPool<Type>::~CThreadPool()
{
 for (std::thread& worker : _workers)
  worker.join();
}


template<class Type>
void CThreadPool<Type>::Submit(Type record)
{
 _tasks.Push(record);
}
#endif // !THREAD_POOL_H

conclusion


Related articles: