【C++11】非同步執行之既有函式的包裝:packaged_task類和async方法
上篇中講到,C++11的標準庫提供了promise用於線上程執行的具體方法中返回資料,接收端通過future阻塞獲取。這麼做的前提是你可以修改方法的引數,或者說你需要寫一個包裝函式。想要讓既有函式非同步的話,你可以使用packaged_task類或者async方法。
具體分析之前,以下程式碼是線上程中需要執行的方法。
MyString some_function() { return MyString{"foo"}; }
MyString是很早之前自己用來檢視copy/move次數的類,不想用的話,可以替換為std::string。
packaged_task
packaged_task是一個封裝了被呼叫的函式的task。注意,packaged_task本身並不提供非同步執行的機制,所以你仍舊需要把packaged_task放到thread中去執行。
std::packaged_task<MyString()> task{some_function}; std::future<MyString> future = task.get_future(); std::thread task_thread{std::move(task)}; task_thread.join(); MyString string = future.get(); std::cout << string << std::endl;
從設計角度來說,packaged_task是橋接了被包裝的函式(這裡是some_function)和future,同時又能被thread執行的一個類,所以實現上,需要能執行(過載operator()),持有被包裝函式的引用或指標。
為了進一步理解,考慮實現一個單一所有權的MyTask
template<class C> class MyTask; template<class R, class ...Args> class MyTask<R(Args...)> { std::function<R(Args...)> function_; MyFuture<R> future_; public: template<class F> MyTask(F &&f) : function_{std::forward<F>(f)} {} // no copy MyTask(const MyTask &) = delete; MyTask &operator=(const MyTask &) = delete; // move is ok MyTask(MyTask &&task) : function_{std::move(task.function_)}, future_{std::move(task.future_)} { } MyTask &operator=(MyTask &&) = delete; MyFuture<R> get_future() { return future_; } void operator()(Args... &&args) { future_.set(function_(std::forward<Args>(args)...)); } };
注意,如果你要實現類似packaged_task的模版類的話,你需要一個只有一個引數的模版,然後再是一個R(Args…)的模版。不這麼做的話你會得到一個編譯錯誤。從模版類角度來說,第二個模版類是第一個的具體化。
packaged_task用內部自己的方式儲存了被包裝函式的指標,這裡使用std::function代替。
future使用前一節的MyFuture,支援複製和轉移。
MyTask<MyString()> task{some_function}; MyFuture<MyString> future = task.get_future(); std::thread task_thread{std::move(task)}; task_thread.join(); MyString string = future.get(); std::cout << string << std::endl;
執行上述程式碼,結果和packaged_task的類似。
作為參考,packaged_task的實際程式碼中,儲存了function和promise,整理結構和MyTask類似。
async
雖然packaged_task能夠包裝需要非同步執行的函式,但是仍舊需要你自己操作thread。為此C++11的標準庫裡提供了另外一個方法級別的非同步執行工具:async。
std::future<MyString> future = std::async(some_function); MyString string = future.get(); std::cout << string << std::endl;
可以看到,程式碼量比packaged_task要少,而且直接返回我們需要的future。
雖然async可以用很少的程式碼非同步執行,但是需要考慮
- async是不是非同步執行?
- 什麼時候執行?
- 是否支援執行緒池?
- 呼叫async之後執行緒會怎麼樣?
在閱讀了async的文件和async本身程式碼之後的回答
- 是的
- 可以引數指定,async為呼叫時開始執行,deferred是在呼叫future的get時執行
- 不支援
- 引數為async時建立執行緒並detach。deferred並不建立thread,只在第一次呼叫future的get時在呼叫執行緒中執行。嚴格來說,deferred不算非同步呼叫
理解以上幾點對使用async方法很重要。
從行為上來看,async並不屬於packaged_task的封裝版。而且從所有權上來看,被包含的function必須被async的返回值future所持有。
實際程式碼其實也是這樣設計的。具體來說,由於future必須允許複製,future持有一個關聯狀態。這個關聯狀態擁有類似shared_ptr的行為,比如說之前的MyFutureInner。在保持MyFutureInner行為的同時,增加一個function欄位,並且啟動一個執行緒呼叫MyFutureInner的set方法就可以實現async。換句話說,需要從MyFutureInner派生一個子類。以下是實現
template<class R> class MyFutureInner { protected: R value_; bool value_set_ = false; std::mutex mutex_; std::condition_variable condition_; std::atomic_int count_; public: MyFutureInner() : count_{1} {} void increase_count() { count_.fetch_add(1, std::memory_order_relaxed); } int decrease_count() { return count_.fetch_sub(1, std::memory_order_acq_rel) - 1; } void set(R &&value) { std::unique_lock<std::mutex> lock{mutex_}; value_ = std::move(value); value_set_ = true; condition_.notify_all(); } virtual R get() { std::unique_lock<std::mutex> lock{mutex_}; if (!value_set_) { condition_.wait(lock); } return std::move(value_); } }; template<class R, class...Args> class MyFutureInnerWithFunction : public MyFutureInner<R> { std::function<R(Args...)> function_; public: MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} {} void execute() { this->set(function_()); } }; template<class R> class MyFuture { MyFutureInner<R> *inner_ptr_; public: MyFuture() : inner_ptr_{new MyFutureInner<R>{}} {} explicit MyFuture(MyFutureInner<R> *inner_ptr) : inner_ptr_{inner_ptr} {} MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} { std::cout << "MyFuture(copy)\n"; inner_ptr_->increase_count(); } MyFuture &operator=(const MyFuture &) = delete; MyFuture(MyFuture &&future) { std::cout << "MyFuture(move)\n"; inner_ptr_ = future.inner_ptr_; future.inner_ptr_ = nullptr; } MyFuture &operator=(MyFuture &&) = delete; R get() { return inner_ptr_->get(); } ~MyFuture() { if (inner_ptr_ != nullptr && inner_ptr_->decrease_count() == 0) { delete inner_ptr_; } } };
注意這裡的MyFutureInner和前篇有所不同,成員變數改成可以被子類訪問的protected,get方法也加了virtual。
MyFutureInner的子類裡增加了成員function,還有一個方法execute。
接下來是async方法的實現
template<class F, class... Args> auto my_async(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> { typedef decltype(f(args...)) R; std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...}; typedef MyFutureInnerWithFunction<R, Args...> FIWF; std::unique_ptr<FIWF> inner_ptr{new FIWF{std::move(function)}}; std::thread async_thread{&FIWF::execute, inner_ptr.get()}; async_thread.detach(); return MyFuture<R>{inner_ptr.release()}; }
這裡方法的簽名參考stackoverflow的一個問題 。
方法內,用MyFutureInnerWithFunction的指標傳入MyFuture。同時,方法內啟動一個thread並且detach。這裡如果不detach的話,thread在方法結束後會被意外銷燬掉,這不是我們想要的,所以必須detach。
執行程式碼
MyFuture<MyString> future = my_async(some_function); MyString string = future.get(); std::cout << string << std::endl;
結果和async是一樣的。
這裡考慮一個問題,假設持有MyFuture的呼叫執行緒沒有呼叫get直接結束的話會發生什麼。由於只有MyFuture持有MyFutureInner的指標,MyFutureInner會被刪除。非同步執行緒訪問時MyFutureInnerWithFunction會是一個無效的記憶體地址。以下是再現的程式碼
MyString some_function() { std::this_thread::sleep_for(std::chrono::milliseconds{500}); return MyString{"foo"}; } void test_future() { MyFuture<MyString> future = my_async(some_function); } int main() { test_future(); std::this_thread::sleep_for(std::chrono::milliseconds{1000}); return 0; }
這裡根本的原因是MyFutureInnerWithFunction擁有者不只是MyFuture,還有非同步執行緒。那樣的話,就不能由MyFuture負責刪除MyFutureInner,同樣也不能由MyFutureInnerWithFunction裡面函式負責刪除,所以只能讓MyFutureInner自己刪除自己,也就是delete this。
修改之後的MyFutureInner和MyFutureInnerWithFunction
template<class R> class MyFutureInner { protected: std::atomic_int count_; R value_; bool value_set_ = false; std::mutex mutex_; std::condition_variable condition_; public: MyFutureInner() : count_{1} {} void increase_count() { count_.fetch_add(1, std::memory_order_relaxed); } void decrease_count() { if (count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { on_zero_shared(); } } virtual void on_zero_shared() { delete this; } void set(R &&value) { std::unique_lock<std::mutex> lock{mutex_}; value_ = std::move(value); value_set_ = true; condition_.notify_all(); } virtual R get() { std::unique_lock<std::mutex> lock{mutex_}; if (!value_set_) { condition_.wait(lock); } return std::move(value_); } }; template<class R, class...Args> class MyFutureInnerWithFunction : public MyFutureInner<R> { std::function<R(Args...)> function_; typedef MyFutureInner<R> base; public: explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} { this->increase_count(); } void execute() { this->set(function_()); this->decrease_count(); } };
另外一個解決方案,是在MyFutureInnerWithFunction裡覆蓋MyFutureInner的decrease_count(或者修改後的on_zero_shared),等待函式完成才能被銷燬。
template<class R, class...Args> class MyFutureInnerWithFunction : public MyFutureInner<R> { std::function<R(Args...)> function_; typedef MyFutureInner<R> base; public: explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} { } void on_zero_shared() { wait_set(); delete this; } void execute() { this->set(function_()); } private: void wait_set() { std::unique_lock<std::mutex> lock{base::mutex_}; if (!base::value_set_) { base::condition_.wait(lock); } } };
這也是async方法裡面返回的future裡的實現。也就是說,async返回的future即使你不呼叫get,你可以不能直接從包含有future的當前方法返回,你會被阻塞住。如果這不是你要的行為,你可以需要自己寫一個類似前一種方法的MyFutureInner,也就是增減shared count。
最後,deferred async由於不涉及非同步執行緒,實現比較簡單。
template<class R, class...Args> class MyFutureInnerWithFunctionDeferred : public MyFutureInner<R> { bool executed = false; std::function<R(Args...)> function_; typedef MyFutureInner<R> base; public: explicit MyFutureInnerWithFunctionDeferred(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} { } R get() { std::unique_lock<std::mutex> lock{base::mutex_}; if (!executed) { lock.unlock(); this->set(function_()); executed = true; } else if (!base::value_set_) { base::condition_.wait(lock); } return std::move(base::value_); } };
以及呼叫程式碼
template<class F, class... Args> auto my_async_deferred(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> { typedef decltype(f(args...)) R; std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...}; return MyFuture<R>(new MyFutureInnerWithFunctionDeferred<R, Args...>{std::move(function)}); } int main() { MyFuture<MyString> future = my_async_deferred(some_function); std::cout << future.get() << std::endl; return 0; }
小結
總得來說,C++11的標準庫提供了好幾種非同步執行的方式,各有各的適用場景。比如說promise適合自己封裝非同步執行的函式,packaged_task用於封裝既有的函式,但是執行緒排程要自己來做,async看起來最簡單,但是你必須理解async返回的future的行為。
不過老實說,async還不是很理想,比如沒有執行緒池,以及future的行為不可調整。但是作為理解如何構建高層次多執行緒處理很有幫助。
最後,希望本文對各位有幫助。