1. 程式人生 > >C++11學習筆記-----獲取非同步操作執行結果

C++11學習筆記-----獲取非同步操作執行結果

在多執行緒環境中,不管是傳遞lambda還是傳遞函式指標,再或者是傳遞函式物件給std::thread,都很難獲取執行函式返回值。在以前,只能將結果以引用的形式作為執行緒函式引數的一部分以此儲存返回值,但是仍然存在很大侷限性,甚至不太美觀。C++11引入的std::future可以有效解決這一問題。

std::future定義在標頭檔案<future>中,提供了一種獲取非同步操作返回值的機制,不過通常與下列三個配合使用

  • std::promise
  • std::packaged_task
  • std::async

這三個操作各有不同,但是都有一個共同點就是都提供了get_future介面用於獲得與之關聯的future,使用者(主執行緒)可以通過返回的future獲得非同步操作結果。

std::promise

簡單來說,promise是一種用於訊息傳遞的機制,或者說是提供儲存值和異常的設施。當建立執行緒時可以將promise引用傳給執行緒函式,當線上程函式(非同步操作)中計算得知了主執行緒想要的結果後通過promise::set_value*等介面設定值(如果出現異常也可以設定異常)。而主執行緒可以通過從promise獲取的future獲取結果

示例:利用std::future和std::promise實現併發std::find函式

和併發std::accumulate的實現類似,首先計算合適的執行緒數,將給定區間拆分成若干小區間,並行執行查詢操作,當找到結果後,通過std::promise設定查詢結果,而主執行緒則通過std::future獲取結果

#include <future>
#include <thread>
#include <vector>
#include <algorithm>
#include <cassert>

namespace parallel
{
    template <class InputIt, class T>
    InputIt find(InputIt first, InputIt last, const T& value)
    {
        /* 
         * 計算合適的執行緒數
         * std::thread::hardware_concurrency()用於返回當前系統支援的併發數
         */
auto count = std::distance(first, last); auto avaThreadNums = std::thread::hardware_concurrency(); auto perThreadMinNums = 20; auto maxThreadNums = ((count + (perThreadMinNums - 1)) & (~(perThreadMinNums - 1))) / perThreadMinNums; auto threadNums = avaThreadNums == 0 ? maxThreadNums : std::min(static_cast<int>(maxThreadNums), static_cast<int>(avaThreadNums)); auto blockSize = count / threadNums; /* 主執行緒建立std::promise例項,模板引數是返回值型別 */ std::promise<InputIt> result; /* 因為不同執行緒會併發查詢,當一個執行緒找到後其他執行緒就可以停止查找了,原子變數done用於標記是否找到 */ std::atomic<bool> done(false); { std::vector<std::thread> threads; auto front = first; for(int i = 0; i < threadNums; ++i) { auto back = front; if(i != threadNums - 1) std::advance(back, blockSize); else back = last; threads.emplace_back( [front, back, &value, &result, &done] { /* 當一個執行緒找到後所有執行緒都會退出,通過done標記管理 */ for(auto it = front; !done && it != back; ++it) { if(*it == value) { done.store(true); /* 如果找到,記錄找到的值 */ result.set_value(it); return; } } } ); } /* 回收執行緒資源 */ for(auto &th : threads) th.join(); } /* 通過std::promise::get_future獲得std::future物件,然後呼叫get獲取結果 */ return done ? result.get_future().get() : last; } } int main() { std::vector<int> v(100000000); int n = 0; std::generate(v.begin(), v.end(), [&n] { return ++n; }); auto value = std::random_device()() % 65536; auto it1 = parallel::find(v.begin(), v.end(), value); auto it2 = std::find(v.begin(), v.end(), value); assert(it1 == it2); return 0; }

本例中同時併發了多個執行緒執行find操作,而最後只需要獲取找到結果的那個執行緒返回的值,不管哪個執行緒找到結果,都可以記錄在std::promise例項中,最終通過std::future返回

當然,使用std::promise的做法和給執行緒函式傳入引用記錄結果的做法基本相同,不過std::promise的功能不僅僅侷限於此,使用起來也更加容易,結構更加清晰

std::packaged_task

std::packaged_task用於包裝任何可呼叫物件,無非就是函式指標,函式物件,lambda等,功能類似於std::function,但是packaged_task可以通過返回的future獲取非同步操作的結果。

舉個例子,當存在一個函式,而這個函式通常會被其它執行緒執行時,那麼想要獲取這個函式的返回值就是件困難的事情,以std::function為例,假設在一個執行緒池中,主執行緒通過std::function包裝了一個函式,新增到任務佇列中,隨後執行緒池中其它執行緒取出這個任務函式並開始執行,在這種情況下,主執行緒是很難獲取這個函式的返回值的。換做std::packaged_task就不同了,它可以通過get_future介面獲取std::future例項,正如先前所說,std::future用於獲取非同步操作的結果,所以無論函式由誰執行,都可以通過std::future::get介面獲取返回值

示例:利用std::packaged_task實現向執行緒池中新增任務

在介紹std::thread的那一篇中,涉及到了執行緒池的實現,藉著對std::packaged_task的理解,重新實現一下向任務佇列中新增任務的函式,同時需要確保呼叫者能夠獲取任務函式返回的結果,這裡可以返回給呼叫者一個std::future例項。另外,獲取std::future例項有三種方法,其中涉及到函式包裝的是std::packaged_task,所以在新增任務時,將任務函式包裝在packaged_task中,返回future

template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
{
    /* 獲取函式f的返回結果,因為std::future模板引數需要儲存結果型別 */
    using return_type = typename std::result_of<F(Args...)>::type;
    /* std::packaged_task不允許複製,所以用指標儲存 */
    /* std::bind()返回可呼叫物件,包裝在packaged_task中 */
    auto task = std::make_shared<std::packaged_task<return_type()>>(
                    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
                );
    /* 獲取future,用於獲得執行結果 */
    std::future<return_type> result = task->get_future();
    {
        std::unique_lock<std::mutex> lock(mutex_);
        tasks_.push([task] { (*task)(); });
        cond_.notify_one();
    }
    /* 返回future */
    return result;
}


int main()
{
    ThreadPool pool(4);
    std::vector<std::future<int>> results;
    for(int i = 0; i < 10; ++i)
    {
        results.emplace_back(
                    pool.enqueue(
                            [i]
                            {
                                return i * i;
                            }
                        )
                    );
    }
    for(auto&& result : results)
      std::cout << result.get() << std::endl;
    return 0;
}

std::async

對於std::async而言,感覺它的抽象要深一些,std::async用於非同步執行給定函式,並返回用於獲取函式返回值的std::future例項。所以std::async本質上應該是開啟一個執行緒執行給定函式,內部採用std::packaged_task對函式進行包裝,然後返回std::future

std::async建構函式有一個非同步屬性,分別是

  • std::launch::async,表示立即開啟非同步求值
  • std::launch::deferred,延遲開啟,只有當返回的future例項呼叫get函式時才開啟非同步求值

而預設情況下的非同步屬性是std::launch::async | std::launch::deferred,所以到底是立即開啟還是延遲開啟取決於編譯器的不同。如果非同步至關重要的話記得在建構函式中指定std::launch::async

示例:利用std::async實行並行std::for_each函式

std::for_each會對指定區間的每一個元素執行給定的函式,所以完全可以並行化。

template <class InputIt, class UnaryFunction>
UnaryFunction for_each(InputIt first, InputIt last, UnaryFunction f)
{
    auto count = tinystl::distance(first, last);
    if(!count)  return f;
    if(count <= 100)
    {
        tinystl::for_each(first, last, f);
    }
    else
    {
        auto middle = first;
        tinystl::advance(middle, count / 2);
        /* 開啟非同步操作對後半部分執行for_each */
        std::async(std::launch::async, tinystl::parallel::for_each<InputIt, UnaryFunction>, middle, last, f);
        /* 當前執行緒執行前半部分 */
        tinystl::for_each(first, middle, f);
    }
    return f;
}

小結

std::future提供了獲取非同步操作執行結果的機制,std::promise用於儲存值和異常,可以看成是訊息傳遞的一種,std::packaged_task用於對可呼叫物件的保證,std::async會開啟一個非同步操作,效果等同於建立新執行緒(或將執行函式新增到執行緒池),包裝執行緒函式,返回future例項