C++11 multithreaded programming std::async in depth

  • 2020-11-26 18:57:09
  • OfStack

preface

C++11 provides asynchronous thread interface std::async, std::async is an advanced encapsulation of asynchronous programming. Over direct use of std::thread, std::async has the following advantages:

1. std::async will automatically create a thread to call the thread function. Compared with the low-level std::thread, it is very convenient to use.

2. std::async returns std::future object. By returning std::future object, we can get the return result of thread function very conveniently.

3. std::async provides thread creation strategy, which can specify synchronous or asynchronous ways to create threads;

1. Function prototype

The following function prototypes are provided in C++ 11:


template< class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
 async( Function&& f, Args&&... args );

template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
 async( std::launch policy, Function&& f, Args&&... args );

Where the parameter f receives 1 callable object (affine function, lambda expression, class member function, ordinary function...) Used for asynchronous or synchronous execution.

The parameter policy is used to specify a callable object for synchronous or asynchronous execution. It has three optional values:

1) std::launch::async: Asynchronous execution callable object;

2) std::launch::deferred: Synchronously execute callable object;

3) std::launch::async | std::launch::deferred may be asynchronous or synchronous, depending on the implementation.

Function return value:

The return value of the function is std::future object. We can execute get, wait, wait_for, wait_until to get or wait for the result of execution.

When the std::future object's get function is called, if the asynchronous execution strategy is executed, if the asynchronous execution does not end, the get function call will block the current calling thread. If a synchronous execution policy is executed, it is only actually executed when the get function is called.

When you call the wait* function on the std::future object, you may return three states:

1) std::future_status::deferred: Callable object has not yet started execution;

2) std::future_status::ready: Callable object executed;

3) std::future_status::timeout: Callable object execution timeout;

2. Header files


#include <future>

3. Read file contents synchronously or asynchronously

We simulate asynchronously reading data from a database and synchronously reading data from a file, from which we can see how std::async is used.


#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <future>
 
using namespace std::chrono;
 
std::string fetchDataFromDB(std::string recvData) {
 std::cout << "fetchDataFromDB start" << std::this_thread::get_id() << std::endl;
 std::this_thread::sleep_for(seconds(5));
 return "DB_" + recvData;
}
 
std::string fetchDataFromFile(std::string recvData) {
 std::cout << "fetchDataFromFile start" << std::this_thread::get_id() << std::endl;
 std::this_thread::sleep_for(seconds(3));
 return "File_" + recvData;
}
 
int main() {
 std::cout << "main start" << std::this_thread::get_id() << std::endl;
 
 // Get start time 
 system_clock::time_point start = system_clock::now();
 
 std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
 
 // Get data from a file 
 std::future<std::string> fileData = std::async(std::launch::deferred, fetchDataFromFile, "Data");
 
 // call get function fetchDataFromFile To begin to implement 
 std::string FileData = fileData.get();
 // if fetchDataFromDB The execution is not complete, get will 1 Blocks the current thread directly 
 std::string dbData = resultFromDB.get();
 
 // Get end time 
 auto end = system_clock::now();
 
 auto diff = duration_cast<std::chrono::seconds>(end - start).count();
 std::cout << "Total Time taken= " << diff << "Seconds" << std::endl;
 
 // The assembly data 
 std::string data = dbData + " :: " + FileData;
 
 // Output the assembled data 
 std::cout << "Data = " << data << std::endl;
 
 return 0;
}

Code output:

[

main start140677737994048
fetchDataFromFile start140677737994048
fetchDataFromDB start140677720131328
Total Time taken= 5Seconds
Data = DB_Data :: File_Data

]

4. Set asynchronous data read timeout mechanism

Sometimes we can't wait indefinitely for asynchronous tasks to execute. We can set timeout wait times (timeout), and when the timeout arrives, we can choose to give up waiting for asynchronous tasks.

If in the code, we set a timeout for 1s to check that the asynchronous thread has finished executing.


#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <future>
 
using namespace std::chrono;
 
std::string fetchDataFromDB(std::string recvData) {
 
 std::cout << "fetchDataFromDB start" << std::this_thread::get_id() << std::endl;
 std::this_thread::sleep_for(seconds(5));
 return "DB_" + recvData;
}
 
 
int main() {
 
 std::cout << "main start" << std::this_thread::get_id() << std::endl;
 
 // Get start time 
 system_clock::time_point start = system_clock::now();
 
 std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
 
 std::future_status status;
 std::string dbData;
 do
 {
  status = resultFromDB.wait_for(std::chrono::seconds(1));
 
  switch (status)
  {
  case std::future_status::ready:
   std::cout << "Ready..." << std::endl;
   // To get the results 
   dbData = resultFromDB.get();
   std::cout << dbData << std::endl;
   break;
  case std::future_status::timeout:
   std::cout << "timeout..." << std::endl;
   break;
  case std::future_status::deferred:
   std::cout << "deferred..." << std::endl;
   break;
  default:
   break;
  }
 
 } while (status != std::future_status::ready);
 
 
 // Get end time 
 auto end = system_clock::now();
 
 auto diff = duration_cast<std::chrono::seconds>(end - start).count();
 std::cout << "Total Time taken= " << diff << "Seconds" << std::endl;
 
 return 0;
}

Program output:

[

main start140406593357632
fetchDataFromDB start140406575482624
timeout...
timeout...
timeout...
timeout...
Ready...
DB_Data
Total Time taken= 5Seconds

]

5, use std::async to achieve multithreaded concurrency

Since std::async can implement asynchronous calls, we can easily borrow it for multithreaded concurrency.


#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
#include <thread>
 
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
 std::cout << "thread id:" << std::this_thread::get_id() << std::endl;
 
 auto len = end - beg;
 if (len < 1000)
  return std::accumulate(beg, end, 0);
 
 RandomIt mid = beg + len/2;
 auto handle_me = std::async(std::launch::async,
        parallel_sum<RandomIt>, mid, end);
 auto handle_bm = std::async(std::launch::async,
        parallel_sum<RandomIt>, beg, mid);
 // int sum = parallel_sum(beg, mid);
 return handle_bm.get() + handle_me.get();
}
 
int main()
{
 std::vector<int> v(10000, 1);
 std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << std::endl;
}

Program output is as follows:

[

The sum is thread id:140594794530624
thread id:140594776655616
thread id:140594768262912
thread id:140594759870208
thread id:140594672297728
thread id:140594680690432
thread id:140594663905024
thread id:140594655512320
thread id:140594647119616
thread id:140594638726912
thread id:140594269644544
thread id:140594630334208
thread id:140594278037248
thread id:140594252859136
thread id:140594261251840
thread id:140594252859136
thread id:140594236073728
thread id:140594252859136
thread id:140594261251840
thread id:140594630334208
thread id:140594244466432
thread id:140594252859136
thread id:140594227681024
thread id:140594261251840
thread id:140593875384064
thread id:140593850205952
thread id:140593858598656
thread id:140593866991360
thread id:140594647119616
thread id:140594269644544
thread id:140594672297728
10000

]

6. Other precautions

When used, it should be noted that the destructor of the std::future object needs to wait for std::async to complete execution, that is, the following code does not implement concurrency. The reason is that the std:: async object returned by std is unreceived and is a temporary variable whose destruct will block until the asynchronous task execution of std::async completes.


std::async(std::launch::async, []{ f(); }); // temporary's dtor waits for f()
std::async(std::launch::async, []{ g(); }); // does not start until f() completes

Reference material

https://en.cppreference.com/w/cpp/thread/async

www.ofstack.com/article/198761.htm

conclusion


Related articles: