1. 程式人生 > >C++併發實戰13:std::future、std::async、std::promise、std::packaged_task

C++併發實戰13:std::future、std::async、std::promise、std::packaged_task

          std::condition_variable可以用於非同步事件的重複通知,但是有些時候可能只等待事件發生一次,比如:等待特定的航班,用條件變數大殺器有點浪費了。C++11標準庫提供了幾種非同步任務機制。通常thread不能返回執行緒執行的結果(可以通過引用引數返回),而在非同步處理當中很多時候都需要獲得計算的結果。如果只獲取結果一次那麼選用future,即通過future獲取了結果後,後續再通過此future獲取結果將會出錯。

(1) future,async,packaged_task,promise用法簡介

         std::future可用於非同步任務中獲取任務結果,但是它只是獲取結果而已,真正的非同步呼叫需要配合std::async, std::promise, std::packaged_task。這裡async是個模板函式,promise和packaged_task是模板類,通常模板例項化引數是任務函式(callable object)。下面是它們的部分組合用法:假設計算任務 int task(string x);

                 1  async+future簡單用法

future<int> myFuture=async(task,10)  

                        //這裡async自動建立一個後臺執行緒(可以選取一個空閒的執行緒)執行任務task函式,並將計算結果儲存在myFuture中,這裡future的模板引數要和任務task返回型別一致為int。怎樣獲得任務結果呢?通常原來的執行緒(即建立myFuture的執行緒記為A,不是async執行task那個執行緒)可以執行其它操作,直到其想要獲取task的結果時呼叫int x=myFuture.get()即可獲得task的執行結果並儲存至x中。注意若task沒有執行完就呼叫了myFuture.get()那麼執行緒A將會阻塞直到task完成。

               2 packaged_task+future簡單用法

packaged_task<int(int)> myPackaged(task);//首先建立packaged_task物件myPackaged其內部建立一個函式task和一個共享狀態(用於返回task的結果)
future<int> myFuture=myPackaged.get_future();//通過packaged_task::get_future()返回一個future物件myFuture用於獲取task的任務結果
thread myThread(move(myPackaged),"hello world");//建立一個執行緒執行task任務,這裡注意move語義強制將左值轉為右值使用因為packaged_task禁止copy constructor,可以不建立執行緒,那麼task任務的執行將和future結果的獲取在同一個執行緒,這樣就不叫非同步了
//這裡主執行緒可以做其它的操作
int x=myFuture.get();//執行緒還可以在執行一些其它操作,直到其想獲取task的結果時呼叫此語句

               3 promise+future簡單用法,摘自cplusplus的範例:

#include <iostream>       // std::cout
#include <functional>     // std::ref
#include <thread>         // std::thread
#include <future>         // std::promise, std::future
void print_int (std::future<int>& fut) {
  int x = fut.get();//當promise::set_value()設定了promise的共享狀態值後,fut將會通過future::get()獲得該共享狀態值,若promise沒有設定該值那麼fut.get()將會阻塞執行緒直到共享狀態值被promise設定
  std::cout << "value: " << x << '\n';//輸出:value: 10
}
int main ()
{
  std::promise<int> prom;                      //建立一個promise物件
  std::future<int> fut = prom.get_future();    //獲取promise內部的future,fut將和promise共享promise中的共享狀態,該共享狀態用於返回計算結果
  std::thread th1 (print_int, std::ref(fut));  //建立一個執行緒,並通過引用方式將fut傳到print_int中
  prom.set_value (10);                         //設定共享狀態值
                                               //
  th1.join();//等待子執行緒
  return 0;
}
                            //從這個例子可以看出promise+future和前面幾個有所不同。

         將主執行緒即需要task結果的執行緒稱為provider,稱執行任務task或上面print_int的執行緒為executor(這裡只是為了後面表述方便,沒有科學考證的)。從上面的例子可以看出,簡單的同步機制都是通過設定某種共享狀態然後通過future獲取該共享狀態達到同步。    

          async通過建立或者選取一個當前空閒執行緒執行任務,然後將計算結果儲存至與此async相關的future中,期間只有存取結果值,沒有其它的互動,並且是provider持有future,executor執行任務。

           packaged_task是一個物件其內部持有callable object,provider建立一個下執行緒executor執行任務,最後provider通過相關的future獲取任務計算結果。和async差不多。只有任務結果的存取,沒有其它互動。

           promise是provider持有,executor持有相關的future,然後provider通過promise設定共享狀態的值,future獲取該共享值後執行某些任務。形式上和前面兩個有點相反。



(2) 細看future,async,packaged_task,promise

           2.1  future可以獲取計算的結果,用於不同執行緒間的簡單同步,future的建立方式:async, packaged_task::get_future ,  promise::get_future這三種返回有效的future,這裡有效是指future和某個共享狀態關聯。

future() noexcept;//建立一個空的future,其不和任何共享狀態相關,注意該future是invalid的,但是其可以move
future (const future&) = delete;//禁止拷貝構造
future (future&& x) noexcept;//具有move語義
~future();//解除和某個共享狀態的關聯,若該future是最後一個和共享狀態關聯的則共享狀態也被銷燬,因為future是禁止拷貝的,所以這裡最後一個可以理解為該future是valid的(和某個共享狀態關聯)
future& operator= (future&& rhs) noexcept;//移動賦值,若rhs是valid的那麼賦值後rhs將不再和該共享狀態關聯,賦值後的future和該共享狀態關聯
future& operator= (const future&) = delete;//禁止拷貝賦值
shared_future<T> share();//返回一個shared_future,shared_future允許多個shared_future和共享狀態關聯並且可以多次get,而future只允許一個future和共享狀態關聯。呼叫share()的future不再和共享狀態關聯,如future<int> f=async(task); shared_future<int> sh=f.share(); f.get(); f.get()*2;
bool valid() const noexcept;//若future和共享狀態關聯則返回true,否則返回false
T get();//若future和某個T型別的共享狀態關聯,那麼呼叫future::get(),若該狀態還沒有準備好阻塞執行緒直到準備好,若狀態準備好了,則返回該狀態值,並將future和共享狀態解綁,此future將invalid
void wait() const;//阻塞等待共享狀態就緒
future_status wait_for (const chrono::duration<Rep,Period>& rel_time) const;//在rel_time時間內等待共享狀態值就緒
future_status wait_until (const chrono::time_point<Clock,Duration>& abs_time) const;//直到abs_time時刻等待共享狀態就緒

future_status:


            2.2 async開啟後臺執行緒執行任務

async (Fn&& fn, Args&&... args);//自動選擇執行緒執行任務fn,args是fn的引數,若fn是某個物件的非靜態成員函式那麼第一個args必須是物件的名字,後面的args是fn所需的引數

async (launch policy, Fn&& fn, Args&&... args);//有三種方式policy執行任務fn
policy=launch::async表示開啟一個新的執行緒執行fn
policy=launch::deferred 表示fn推遲到future::wait/get時才執行
policy=launch::async|launch::deferred表示由庫自動選擇哪種機制執行fn,和第一種構造方式async(fn,args)策略相同


           2.3 packaged_task類似與std::function但是其允許非同步存取結果,其內部持有一個函式呼叫和共享狀態,該共享狀態可以被packaged_task返回的future獲取。

packaged_task() noexcept;//空的packaged_task物件,沒有共享狀態和內部函式
explicit packaged_task (Fn&& fn);//內部有共享狀態和函式fn
explicit packaged_task (allocator_arg_t aa, const Alloc& alloc, Fn&& fn);//共享狀態通過alloc分配記憶體(該共享狀態可能是個buffer)
packaged_task (const packaged_task&) = delete;//禁止拷貝構造
packaged_task (packaged_task&& x) noexcept;//具有移動語義

~packaged_task();//丟棄共享狀態,若還有future和共享狀態關聯,那麼共享狀態不會被銷燬直到future銷燬,如果析構發生時共享狀態還沒有被設定那麼析構將設定共享狀態並在狀態里加入異常

packaged_task& operator= (packaged_task&& rhs) noexcept;//rhs的共享狀態將被移動,rhs將沒有共享狀態
ackaged_task& operator= (const packaged_task&) = delete;//禁止拷貝

bool valid() const noexcept;//若packaged_task內部有共享狀態則返回true,否則返回false
future<Ret> get_future();//返回一個future用以獲得共享狀態,該函式只能被呼叫一次

void operator()(Args... args);//執行fn,若成功則將結果寫入共享狀態,若失敗則寫入異常到共享狀態,通過future::get()可以獲取該狀態
void make_ready_at_thread_exit (args... args);//結果介入共享狀態,但是在該函式所在的呼叫執行緒結束後才使共享狀態就緒,即該執行緒結束後future::get()才能獲取狀態值,若在寫入狀態值和執行緒沒有退出期間有寫入該狀態的行為將丟擲future_error的異常

void swap (packaged_task& x) noexcept;//交換兩個packaged_task的內部share state 和 callable object

 

          2.4 promise可以存入一個共享狀態值,相關的std::future可以獲取該值

promise();//空的promise物件,沒有共享狀態值
template <class Alloc> promise (allocator_arg_t aa, const Alloc& alloc);//Alloc將為共享狀態值開闢記憶體
promise (const promise&) = delete;//禁止拷貝賦值
romise (promise&& x) noexcept;//具備移動語義
~promise();//和~packaged_task()語義一樣

promise& operator= (promise&& rhs) noexcept;//移動賦值,rhs不再有共享狀態
promise& operator= (const promise&) = delete;

future<T> get_future();//返回一個future和共享狀態關聯,可以通過此future獲取共享狀態的值或異常,該函式只能被呼叫一次

void set_value (const T& val);//設定共享狀態的值
void set_value (const T& val);
void promise<R&>::set_value (R& val);   // when T is a reference type (R&)
void promise<void>::set_value (void);   // when T is void

void set_exception (exception_ptr p);//設定異常指標p到共享狀態中,若狀態關聯的future::get()會獲得該異常(並解除阻塞)

void set_value_at_thread_exit (const T& val);//和packaged_task::make_ready_at_thread_exit()語義一樣
void set_value_at_thread_exit (T&& val);	
void promise<R&>::set_value_at_thread_exit (R& val);     // when T is a reference type (R&)
void promise<void>::set_value_at_thread_exit (void);     // when T is void

void set_exception_at_thread_exit (exception_ptr p);//設定異常到共享狀態中,但是線上程結束時才使共享狀態就緒


void swap (promise& x) noexcept;//交換兩個promise物件的共享狀態

          2.5 shared_future和future的區別是:一個future物件和一個共享狀態相關,且轉移只能通過move語義。但是多個shared_future物件可以和共享狀態相關(即多對一)。std::shared_future 與 std::future 類似,但是 std::shared_future 可以拷貝、多個 std::shared_future 可以共享某個共享狀態的最終結果(即共享狀態的某個值或者異常)。shared_future 可以通過某個 std::future 物件隱式轉換(參見 std::shared_future 的建構函式),或者通過 std::future::share() 顯示轉換,無論哪種轉換,被轉換的那個 std::future 物件都會變為 not-valid.

shared_future() noexcept;
shared_future (const shared_future& x);
shared_future (shared_future&& x) noexcept;
shared_future (future<T>&& x) noexcept;
下面的成員函式和future差不多:
operator=
   賦值操作符,與 std::future 的賦值操作不同,std::shared_future 除了支援 move 賦值操作外,還支援普通的賦值操作。
get
   獲取與該 std::shared_future 物件相關聯的共享狀態的值(或者異常)。
valid
   有效性檢查。
wait
   等待與該 std::shared_future 物件相關聯的共享狀態的標誌變為 ready。
wait_for
   等待與該 std::shared_future 物件相關聯的共享狀態的標誌變為 ready。(等待一段時間,超過該時間段wait_for 返回。)
wait_until
   等待與該 std::shared_future 物件相關聯的共享狀態的標誌變為 ready。(在某一時刻前等待,超過該時刻 wait_until 返回。)


 3) 通常執行緒池採用模板實現時各執行緒執行的都是相同型別的任務,若採用packaged_task可以將不同型別的函式物件封轉在其內部,每個執行緒取走一個packaged_task執行,那麼執行緒池執行的任務可以不同。

       下面是一個GUI中一個執行緒專門接收使用者任務並壓入任務佇列,另一個執行緒專門執行使用者任務

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread()
{
   while(!gui_shutdown_message_received())//不斷獲取使用者任務
   {
      get_and_process_gui_message();
      std::packaged_task<void()> task;
      {
          std::lock_guard<std::mutex> lk(m);
          if(tasks.empty())
              continue;
          task=std::move(tasks.front());//
          tasks.pop_front();
      }
      task();
   }
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)//新增任務
{
   std::packaged_task<void()> task(f);
   std::future<void> res=task.get_future();
   std::lock_guard<std::mutex> lk(m);
   tasks.push_back(std::move(task));//
   return res;
}