Quick understanding of the multithreading model of ES0en. Asio

  • 2020-09-16 07:44:17
  • OfStack

There are two ways to support multithreading in Boost.Asio. The first is simpler: in a multithreaded scenario, each thread holds one io_service, and each thread calls its own run() method of io_service.

Another way to support multithreading: globally, only 1 io_service is allocated, and this io_service is Shared among multiple threads, each calling the run() method of the global io_service.

1 I/O Service per thread

Let's examine the first scenario first: in a multithreaded scenario, each thread holds 1 io_service (the usual practice is to keep the number of threads and the number of CPU cores at 1). So what are the characteristics of this scheme?

1 On multi-core machines, this scheme can make full use of multiple CPU cores.

2 An socket descriptor is not Shared across multiple threads, so synchronization does not need to be introduced.

You cannot perform blocking operations in event handler, otherwise you will block the thread in which io_service is located.

Here we implement an AsioIOServicePool that encapsulates the thread pool creation operation:


class AsioIOServicePool
{
public:
  using IOService = boost::asio::io_service;
  using Work = boost::asio::io_service::work;
  using WorkPtr = std::unique_ptr<Work>;
  AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency())
    : ioServices_(size),
     works_(size),
     nextIOService_(0)
  {
    for (std::size_t i = 0; i < size; ++i)
    {
      works_[i] = std::unique_ptr<Work>(new Work(ioServices_[i]));
    }
    for (std::size_t i = 0; i < ioServices_.size(); ++i)
    {
      threads_.emplace_back([this, i] ()
                 {
                   ioServices_[i].run();
                 });
    }
  }
  AsioIOServicePool(const AsioIOServicePool &) = delete;
  AsioIOServicePool &operator=(const AsioIOServicePool &) = delete;
  //  use  round-robin  Return by means of 1 a  io_service
  boost::asio::io_service &getIOService()
  {
    auto &service = ioServices_[nextIOService_++];
    if (nextIOService_ == ioServices_.size())
    {
      nextIOService_ = 0;
    }
    return service;
  }
  void stop()
  {
    for (auto &work: works_)
    {
      work.reset();
    }
    for (auto &t: threads_)
    {
      t.join();
    }
  }
private:
  std::vector<IOService>    ioServices_;
  std::vector<WorkPtr>     works_;
  std::vector<std::thread>   threads_;
  std::size_t         nextIOService_;
};

AsioIOServicePool is also simple to use:


std::mutex mtx;       // protect std::cout
AsioIOServicePool pool;

boost::asio::steady_timer timer{pool.getIOService(), std::chrono::seconds{2}};
timer.async_wait([&mtx] (const boost::system::error_code &ec)
         {
           std::lock_guard<std::mutex> lock(mtx);
           std::cout << "Hello, World! " << std::endl;
         });
pool.stop();

1 I/O Service with multiple threads

Another option is to allocate 1 global io_service and then start multiple threads, each calling the run() method of io_service. Thus, when an asynchronous event completes, io_service hands the corresponding event handler to any one thread for execution.

However, in the actual use of this scheme, some problems need to be paid attention to:

1 Blocking operations (such as database query operations) are allowed in event handler.

The number of threads can be greater than the number of CPU cores. For example, if you need to perform blocking operations in event handler, you will need to increase the number of threads in order to increase the program's response time.

3 Because multiple threads are running the event loop simultaneously (event loop), this causes a problem: a single socket descriptor may be Shared among multiple threads, making it prone to race conditions (race condition). For example, if an socket readable event occurs twice very quickly, then you have the problem of two threads reading the same socket at the same time (you can use strand to solve this problem).

The following implements a thread pool that executes the run() method of io_service in each worker thread:


class AsioThreadPool
{
public:
  AsioThreadPool(int threadNum = std::thread::hardware_concurrency())
    : work_(new boost::asio::io_service::work(service_))
  {
    for (int i = 0; i < threadNum; ++i)
    {
      threads_.emplace_back([this] () { service_.run(); });
    }
  }
  AsioThreadPool(const AsioThreadPool &) = delete;
  AsioThreadPool &operator=(const AsioThreadPool &) = delete;
  boost::asio::io_service &getIOService()
  {
    return service_;
  }
  void stop()
  {
    work_.reset();
    for (auto &t: threads_)
    {
      t.join();
    }
  }
private:
  boost::asio::io_service service_;
  std::unique_ptr<boost::asio::io_service::work> work_;
  std::vector<std::thread> threads_;
};

Lock-free synchronization

How do you solve the race conditions mentioned earlier? Boost.Asio provides io_service::strand: If multiple event handler are distributed by the same strand object (dispatch), then these event handler are guaranteed to execute sequentially.

For example, the following example USES strand, so there is no need to use a mutex to guarantee synchronization:


AsioThreadPool pool(4);  //  open  4  A thread 
boost::asio::steady_timer timer1{pool.getIOService(), std::chrono::seconds{1}};
boost::asio::steady_timer timer2{pool.getIOService(), std::chrono::seconds{1}};
int value = 0;
boost::asio::io_service::strand strand{pool.getIOService()};

timer1.async_wait(strand.wrap([&value] (const boost::system::error_code &ec)
               {
                 std::cout << "Hello, World! " << value++ << std::endl;
               }));
timer2.async_wait(strand.wrap([&value] (const boost::system::error_code &ec)
               {
                 std::cout << "Hello, World! " << value++ << std::endl;
               }));
pool.stop();

Multithreaded Echo Server

The following EchoServer can be used in multithreading. It USES asio::strand to solve the race problem mentioned earlier:


class TCPConnection : public std::enable_shared_from_this<TCPConnection>
{
public:
  TCPConnection(boost::asio::io_service &io_service)
    : socket_(io_service),
     strand_(io_service)
  { }

  tcp::socket &socket() { return socket_; }
  void start() { doRead(); }

private:
  void doRead()
  {
    auto self = shared_from_this();
    socket_.async_read_some(
      boost::asio::buffer(buffer_, buffer_.size()),
      strand_.wrap([this, self](boost::system::error_code ec,
                   std::size_t bytes_transferred)
             {
               if (!ec) { doWrite(bytes_transferred); }
             }));
  }
  void doWrite(std::size_t length)
  {
    auto self = shared_from_this();
    boost::asio::async_write(
      socket_, boost::asio::buffer(buffer_, length),
      strand_.wrap([this, self](boost::system::error_code ec,
                   std::size_t /* bytes_transferred */)
             {
               if (!ec) { doRead(); }
             }));
  }
private:
  tcp::socket socket_;
  boost::asio::io_service::strand strand_;
  std::array<char, 8192> buffer_;
};
class EchoServer
{
public:
  EchoServer(boost::asio::io_service &io_service, unsigned short port)
    : io_service_(io_service),
     acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
  {
    doAccept();
  }
  void doAccept()
  {
    auto conn = std::make_shared<TCPConnection>(io_service_);
    acceptor_.async_accept(conn->socket(),
                [this, conn](boost::system::error_code ec)
                {
                  if (!ec) { conn->start(); }
                  this->doAccept();
                });
  }

private:
  boost::asio::io_service &io_service_;
  tcp::acceptor acceptor_;
};

That's a quick look at the Boost.Asio multithreaded model in detail. For more information on the c++ ES127en.Asio multithreaded model, please follow the other articles on this site!


Related articles: