c++11 Multithreaded Programming How to implement thread safe queues

  • 2020-11-26 18:55:34
  • OfStack

The interface file for the thread-safe queue is as follows:


#include <memory> 
template<typename T>
class threadsafe_queue {
 public:
  threadsafe_queue();
  threadsafe_queue(const threadsafe_queue&);
  threadsafe_queue& operator=(const threadsafe_queue&) = delete;

  void push(T new_value);

  bool try_pop(T& value);
  std::shared_ptr<T> try_pop();

  void wait_and_pop(T& value);
  std::shared_ptr<T> wait_and_pop();

  bool empty() const;
};

push function

The push() function implements the ability to add data to the queue. After the data is added, the notify_one of std::condition_variable is used to notify the thread that was blocked when the data was fetched.


void push(T tValue) {
  std::shared_ptr<T> data(std::make_shared<T>(std::move(tValue)));
  std::lock_guard<std::mutex> lk(mut);
  data_queue.push(data);
  data_con.notify_one();
}

wait_and_pop function

The wait_and_pop() function fetches data from the queue. When the queue is empty, the thread is suspended and wakes up waiting for data.

Note that std::lock_guard is not used in these two functions. Instead, std::unique_lock is used.

This is because the wait function of std::condition_variable first checks whether the condition data_queue.empty () is satisfied. If the queue is empty, the wait function releases mutex and suspends it. When new data is queued, the wait function of std::condition_variable is woken up, retries to get mutex, then detects if the queue is empty and continues down if the queue is not empty. Instead of using std::lock_guard, std::unique_lock because of lock release and reacquisition during the execution of the function.


void wait_and_pop(T& value) {
  std::unique_lock<std::mutex> lk(mut);
  data_cond.wait(lk,[this]{return !data_queue.empty();});
  value=data_queue.front();
  data_queue.pop();
}

std::shared_ptr<T> wait_and_pop() {
  std::unique_lock<std::mutex> lk(mut);
  data_cond.wait(lk,[this]{return !data_queue.empty();});
  std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
  data_queue.pop();
  return res;
}

try_pop function

The try_pop function provides the ability to eject a queue (queue) under a non-blocking call. Eject returns true or non-empty shared_ptr on success, false or nullptr on failure.


bool try_pop(T& value) {
  std::lock_guard<std::mutex> lk(mut);
  if(data_queue.empty()) {
    return false;
  } 
  value = data_queue.front();
  data_queue.pop();
  
  return true;
}

std::shared_ptr<T> try_pop() {
  std::lock_guard<std::mutex> lk(mut);
  if(data_queue.empty()) {
    return std::shared_ptr<T>();
  }
  std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
  data_queue.pop();
  return res;
}

empty function


bool empty() const {
  std::lock_guard<std::mutex> lk(mut);
  return data_queue.empty();
}

Note here that empty() is a member function of type const, indicating that it declares itself without changing any member variables, but mutex lock is one mutating opertation, so you must declare mut as type mutable (mutable std::mutex mut).

The complete code is as follows:


#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue {
 private:
   mutable std::mutex mut; 
   std::queue<T> data_queue;
   std::condition_variable data_cond;
 public:
   threadsafe_queue(){}
   threadsafe_queue(threadsafe_queue const& other) {
     std::lock_guard<std::mutex> lk(other.mut);
     data_queue=other.data_queue;
   }

   void push(T new_value) {
     std::lock_guard<std::mutex> lk(mut);
     data_queue.push(new_value);
     data_cond.notify_one();
   }

   void wait_and_pop(T& value) {
     std::unique_lock<std::mutex> lk(mut);
     data_cond.wait(lk,[this]{return !data_queue.empty();});
     value=data_queue.front();
     data_queue.pop();
   }

   std::shared_ptr<T> wait_and_pop() {
     std::unique_lock<std::mutex> lk(mut);
     data_cond.wait(lk,[this]{return !data_queue.empty();});
     std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
     data_queue.pop();
     return res;
   }

   bool try_pop(T& value) {
     std::lock_guard<std::mutex> lk(mut);
     if(data_queue.empty())
       return false;
     value=data_queue.front();
     data_queue.pop();
     return true;
   }

   std::shared_ptr<T> try_pop() {
     std::lock_guard<std::mutex> lk(mut);
     if(data_queue.empty())
       return std::shared_ptr<T>();
     std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
     data_queue.pop();
     return res;
   }

   bool empty() const {
     std::lock_guard<std::mutex> lk(mut);
     return data_queue.empty();
   }
};

That's how c++ implements thread-safe queues. For more information on c++ thread safe queues, check out the other articles on this site!


Related articles: