1. 程式人生 > >C++併發程式設計

C++併發程式設計

C++11相比之前的版本具有很多優秀的特性,比如lambda表示式,初始化列表,右值引用,自動型別推導。同時,C++11標準庫現在也支援正則表示式、智慧指標、多執行緒庫。

但現代C++在並行和非同步計算方面依然較為薄弱,特別是與C#等語言相比。

非同步的需要

為什麼需要支援非同步呢?多核處理器幾乎無處不在、並在雲中分佈的核,使得計算機體系結構變得越來越並行化和分散式化。軟體程式往往越來越多的由使用了位於單個機器或跨網路的多個核的各元件組成。現代程式語言需要提供對這種並行的支援。

同時,響應性(這是響應式程式設計的原則之一)已成為越來越不可或缺的軟體質量。

響應性的意思是在進行IO操作時不是阻塞住等待它的完成。在伺服器端不阻塞一個worker執行緒、而讓它繼續做其他的事情,待操作完成後等待下一個任務。在客戶端不阻塞主執行緒或GUI執行緒,否則將使程式變得反應遲鈍。因此能寫非同步程式碼對於管理IO操作的延遲越來越重要。例如,在WinRT中有一個規則,所有耗時超過50ms的IO密集型API只提供非同步介面,甚至沒有傳統的阻塞式介面可呼叫。

接下來我們看下C++目前提供的支援並行程式設計的方法,這些方法又有什麼新特點。我們可以看到標準方法,以及微軟提供的windows特定的PPL框架。

簡單示例

為了便於理解在C++中如何寫非同步程式碼,我們先寫一個簡單的示例:讀一個檔案,將其內容寫到另一個檔案中。

#include <string> 
#include <vector> 
#include <fstream> 
#include <iostream> 
using namespace std; 

vector<char> readFile(const
string& inPath) { ifstream file(inPath, ios::binary | ios::ate); size_t length = (size_t)file.tellg(); vector<char> buffer(length); file.seekg(0, std::ios::beg); file.read(&buffer[0], length); return buffer; } size_t writeFile(const vector<char>& buffer, const
string& outPath) { ofstream file(outPath, ios::binary); file.write(&buffer[0], buffer.size()); return (size_t)file.tellp(); }

由上述函式可以寫一個簡單的函式實現複製一個檔案的內容到另一個檔案中,並返回字元數目:

size_t sync_copyFile(const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

顯然,我們希望依次執行readFile和writelFile函式。但是有必要阻塞的等待它們完成嗎?當然,這是一個人為的例子,如果檔案不是很大這一點可能無關緊要,如果檔案很大,我們可以使用快取及塊拷貝的方法代替將檔案內容填充到很大的vector返回。但是readFile和writeFile都是IO密集函式,在這裡只表示一種更復雜IO操作的模式。在真實程式中從網路中讀取資料、經過轉換後返回或寫入是很普遍的。

讓我們看看在標準C++中如何實現非同步執行copyFile操作。

基於任務的並行:future和promise

C++標準庫提供了一些支援併發的機制。首先是std::thread,以及相關的同步用物件(std::mutex, std::lock_guards, std::condition_variables等),最終提供了可寫“傳統的”基於多執行緒併發程式碼的便捷方法。

我們需要修改copyFile:建立一個新執行緒執行拷貝操作,並在執行緒執行完成後使用condition_variable進行通知。但是使用執行緒和鎖進行操作比較複雜麻煩。現代框架(例如.net中的TPL)以基於任務(task)的併發方式提供了更高階的抽象。一個task代表了一個可以與其他操作並行執行的非同步操作,系統隱藏了其實現的具體細節。

C++11標準庫標頭檔案也以promise和future提供對基於task並行的(有限地)支援。類std::promise和std::future類比.net中的Task、或者Java8中的Future。這兩個類總是成對出現,將呼叫函式與獲取執行結果分離開來。

當我們呼叫非同步函式時呼叫方並不是得到型別T的結果,而是一個std::future物件,這是一個將在未來某個時間點返回結果的佔位符。

一旦獲得future,我們可以繼續做其他事情,同時非同步任務也在一個獨立的執行緒中執行。

std::promise物件代表非同步呼叫被呼叫方的執行結果,這是一個將非同步結果傳給呼叫方的通道。當任務完成後,被呼叫方通過呼叫promise::set_value將結果賦給一個promise物件。

當呼叫方最後需要結果時可呼叫阻塞函式future::get()即可獲得。如果任務已經完成,結果可被立即使用,否則,呼叫執行緒將被掛起直到結果可用。

上述copyFile使用future和promise修改後的版本:

#include <future> 

size_t future_copyFile(const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 

    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

注意到我們將readFile和writeFile的執行移動到了單獨的task中,但是我們仍然需要設定、啟動執行緒執行它們。並且,在lambda表示式中捕獲了需要的promise和future物件,以便可以在lambda表示式中直接使用。第一個執行緒執行讀操作,當讀完成時將結果填入vector型別的promise中。第二個執行緒阻塞地等待,當第一個執行緒讀操作完成時,結果會傳入此執行緒的寫函式中。最後,寫操作完成時,寫的字元數填入第二個future中。

主執行緒可以有效利用這種並行,在呼叫future::get()獲得最終結果之前可以做其他的事情。當然,當呼叫future::get()時如果讀寫操作還未完成,主執行緒還是會阻塞。

Packaged tasks

可以用packaged_task稍微簡化上述程式碼。類std::packaged_task 是一個task及其promise的容器。它的模板型別T是一個任務函式型別(例如,對於上面的readFile函式,T就是vector(const string&) )。它是一個可呼叫型別(定義了operator()),併為我們自動建立和管理了一個std::promise物件。

size_t packagedtask_copyFile(const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = vector<char>(const string&); 
    packaged_task<Task_Type_Read> pt1(readFile); 
    future<vector<char>> fut1{ pt1.get_future() }; 
    thread th1{ move(pt1), inFile }; 

    using Task_Type_Write = size_t(const string&); 
    packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    future<size_t> fut2{ pt2.get_future() }; 
    thread th2{ move(pt2), outFile }; 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

(譯者注:promise封裝的是資料型別,packaged_task封裝的是可呼叫物件,兩者都有get_future()介面獲得future物件)

注意到需要使用move()函式傳遞packaged_task給執行緒,因為packaged_task型別無法被拷貝。

std::async

使用packaged_task後代碼沒有改變多少,稍微好讀了一些,但是我們仍然需要手動建立執行緒、決定task執行在哪個執行緒中。

如果我們使用標準庫中的std::async函式,一切會變得非常簡單。傳入一個lambda或functor,std::async會返回包含執行結果值的future。copyFile函式用std::async()修改後的版本如下:

size_t async_copyFile(const string& inFile, const string& outFile) 
{ 
    auto fut1 = async(readFile, inFile); 
    auto fut2 = async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, 
    outFile); 

    return fut2.get(); 
} 

從某種程度上來說std::async()相當於TPL任務排程。它會決定在哪個執行緒中執行任務,或在新建立的新執行緒中,或在重複使用的老執行緒中。

也可以通過第二個引數決定啟動策略,“async”表示立即非同步地執行任務,可能是在不同的執行緒中,“deferred”表示只有在get()被呼叫的時候才開始執行任務。

std::async的優點是隱藏了所有的實現、平臺相關的細節。檢視vs2013中標頭檔案,可以看到其在windows平臺下的實現,內部使用了相當於.NET TPL的PPL(Parallel Patterns Library)。

PPL

上面看到的都是C++11標準化的內容。客觀上來說future和promise的使用依然是有限的,特別是跟C#和.net相比。

主要的限制是在C++11中future不是可組合的(composable)。如果啟動多個task平行計算,我們無法實現阻塞所有的future、等待其中任意一個完成,但在某一刻只有一個future返回。同樣,也沒有好的方法將一組task組合成一個序列,每個task的結果作為下一個task的輸入。可組合task可以使整個體系結構非阻塞且事件驅動。真的希望C++中也有類似任務延續(task continuations)或async/await的模式。

有了PPL(又名Concurrency Runtime)微軟有可能突破標準的束縛,嘗試task庫的更復雜的實現。

在PPL中,類Concurrency::task(定義在< ppltasks.h>標頭檔案中)代表了一個task。一個task等價於一個future。同時也提供了相同的阻塞方法以獲取結果——get()。模板引數T表示返回型別,task進行初始化時將工作函式傳入其中(lambda表示式、函式指標或函式物件)。

下面暫時不考慮可移植性,重新實現copyFile函式:

size_t ppl_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<vector<char>> tsk1 = Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }); 
    Concurrency::task<size_t> tsk2 = Concurrency::create_task([&tsk1, outFile]() { 
        return writeFile(tsk1.get(), outFile); 
    }); 
    return tsk2.get(); 
} 

此處建立了2個task物件,由兩個lambda表示式進行初始化,分別表示讀和寫操作。

現在我們真正不用考慮執行緒問題了。由PPL排程來決定在哪執行task、如何管理一個執行緒池。要注意我仍然要手動地協調兩個task之間的相互作用關係:task2擁有task1的引用,明確地等待task1結束後再使用其結果。這在像這種簡單例子中是可以接受的,但如果是更多task、更復雜的程式碼,這將會變得非常繁瑣。

任務延續(Task continuations)

與future不同,PPL的task支援通過延續來進行組合。task::next方法可以將一個個task延續起來。當前序task完成時將返回其結果並呼叫後續task。

這次使用任務延續重寫copyFile:

size_t ppl_then_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<size_t> result =  
    Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 

    return result.get(); 
} 

這段程式碼非常簡潔。我們把copy函式的邏輯拆分成兩個獨立的、可以執行在任意執行緒的部分(task),並被一個task排程程式執行。

上述copyFile的實現仍然會在最後處阻塞以獲得最終結果,但在一個真實的程式中可以僅返回task物件,插入到程式邏輯中,新增到一個任務延續中非同步操作其結果。例如:

Concurrency::task<size_t> ppl_create_copyFile_task(const string& inFile, const string& outFile) 
{ 
    return Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
} 
... 
auto tCopy = ppl_create_copyFile_task(inFile, outFile).then([](size_t written) { 
    cout << written << endl; 
}); 
... 
tCopy.wait(); 

PPL同時還提供了其他方法來組合任務,task::wait_all和task::wait_any,對管理並行執行的一組任務很有幫助。

對於一組task,when_all會在所有task完成後建立另一個task(內部實現了join)。相反,when_any會在任意一個task完成時建立一個另一個task,這在有些地方是很有用的,比如要限制task的並行數量、只有在一個task完成後才開始一個新task。

以上只是PPL的冰山一角。PPL提供了非常豐富的函式集,幾乎與其託管版本相當。還提供了一些排程類,用於實現管理工作執行緒池、分配task到執行緒中等重要任務。詳細請見此處

C++17前瞻

很快我們有希望在C++標準中見到上面介紹的一些PPL中的實現。這裡已經有一份Niklas Gustaffson等人寫的檔案N3857,它提出了一些標準的變化。特別地,提供了 future::then, future::when_any and future::when_all實現future的可組合,這同PPL有著相同的語義。對於之前的例子,用新的future可以很方便的寫成如下程式碼:

future<size_t> future_then_copyFile(const string& inFile, const string& outFile)
{
    return async([inFile]() {
        return readFile(inFile);
    }).then([outFile](const vector<char>& buffer) {
        return writeFile(buffer, outFile);
    });
}

還有函式 future::is_ready用於在呼叫阻塞函式get()之前先測試future是否已完成,future::unwrap用於管理巢狀future(future返回的還是future)。當然,此方案的所有細節都可以在上述提供的檔案中找到。

這就是完美解決方案了?不完全是。在.net中的經驗告訴我們基於task的非同步程式碼仍然很難編寫、除錯、維護。有時候非常困難。這就是為什麼為了通過async/await模式更方便的管理非同步任務,添加了一些新關鍵詞到C# 5.0中。在C++世界是否有類似的方法呢?

相關推薦

Python3 與 C# 併發程式設計之~ 執行緒上篇

2.2.加強篇¶ 其實以前的Linux中是沒有執行緒這個概念的,Windows程式設計師經常使用執行緒,這一看~方便啊,然後可能是當時程式設計師偷懶了,就把程序模組改了改(這就是為什麼之前說Linux下的多程序程式設計其實沒有Win下那麼“重量級”),弄了個精簡版程序==>執行緒(核心是分不出程序

C++併發程式設計(中文版)

  地址: https://legacy.gitbook.com/book/chenxiaowei/cpp_concurrency_in_action/details     是《C++ Concurrency in Action》一書的中文翻譯

C#併發程式設計經典例項》學習筆記—2.2 返回完成的任務

問題: 如何實現一個具有非同步簽名的同步方法。 從非同步介面或基類繼承程式碼,但希望用同步方式實現方法。 解釋一下所謂的非同步介面和非同步基類。例如如下程式碼 interface IMyAsyncI

c++併發程式設計之thread::join()和thread::detach()

thread::join(): 阻塞當前執行緒,直至 *this 所標識的執行緒完成其執行。*this 所標識的執行緒的完成同步於從 join() 的成功返回。 該方法簡單暴力,主執行緒等待子程序期間什麼都不能做。thread::join()會清理子執行緒相關的記憶體空間,

C#併發程式設計經典例項》—— 超時

問題 我們希望事件能在預定的時間內到達,即使事件不到達,也要確保程式能及時進行響應。 通常此類事件是單一的非同步操作(例如,等待 Web 服務請求的響應)。 解決方案 Timeout 操 作 符 在 輸 入 流 上 建 立 一 個 可 調 節 的 超 時 窗 口。 一 旦 新 的 事 件 到

C#併發程式設計經典例項》—— 用限流和抽樣抑制事件流

問題 有時事件來得太快,這是編寫響應式程式碼時經常碰到的問題。一個速度太快的事件流可導 致程式的處理過程崩潰。 解決方案 Rx 專門提供了幾個操作符,用來對付大量湧現的事件資料。Throttle 和 Sample 這兩個操 作符提供了兩種不同方法來抑制快速湧來的輸入事件。 Throttle

C#併發程式設計經典例項》—— 用視窗和緩衝對事件分組

問題 有一系列事件,需要在它們到達時進行分組。舉個例子,需要對一些成對的輸入作出響 應。第二個例子,需要在 2 秒鐘的視窗期內,對所有輸入進行響應。 解決方案 Rx 提 供 了 兩 個 對 到 達 的 序 列 進 行 分 組 的 操 作:Buffer 和 Window。Buffer 會 留

C#併發程式設計經典例項》—— 傳送通知給上下文

問題 Rx 儘量做到了執行緒不可知(thread agnostic)。因此它會在任意一個活動執行緒中發出通知(例如 OnNext)。 但是我們通常希望通知只發給特定的上下文。例如 UI 元素只能被它所屬的 UI 執行緒控制, 因此,如果要根據 Rx 的通知來修改 UI,就應該把通知“轉移”到

C#併發程式設計經典例項》—— Rx基礎

LINQ 是 對 序 列 數 據 進 行 查 詢 的 一 系 列 語 言 功 能。 內 置 的 LINQ to Objects( 基 於 IEnumerable<T>) 和 LINQ to Entities( 基 於 IQueryable<T>) 是 兩 個 最 常

C#併發程式設計經典例項》—— 轉換.NET事件

問題 把一個事件作為 Rx 輸入流,每次事件發生時通過 OnNext 生成資料。 解決方案 Observable 類 定 義 了 一 些 事 件 轉 換 器。 大 部 分 .NET 框 架 事 件 與 FromEventPattern 兼 容, 對於不遵循通用模式的事件,需要改用 FromE

C++ 併發程式設計》- 第1章 你好,C++的併發世界

本文是《C++ 併發程式設計》的第一章,感謝人民郵電出版社授權併發程式設計網發表此文,版權所有,請勿轉載。該書將於近期上市。 本章主要內容 何謂併發和多執行緒  為什麼要在應用程式中使用併發和多執行緒  C++併發支援的發展歷程  一個簡單的C++多執行緒程式是什麼樣的 這是C++使用者

C#併發程式設計經典例項》學習筆記—2.4 等待一組任務完成

問題 執行幾個任務,等待它們全部完成。 使用場景 幾個獨立任務需要同時進行 UI介面載入多個模組,併發請求 解決方案 Task.WhenAll 傳入若干任務,當所有任務完成時,返回一個完成的任務。 過載方法 Task WhenAll(IEnumerable<Task>

C++併發程式設計C++11)

前言 首先需要說明,本部落格的主要內容參考自Forhappy && Haippy博主的分享,本人主要是參照博主的資料進行了學習和總結,並適當的衍生或補充了相關的其他知識內容。 C++11有了std::thread 以後,可以在語言層面編寫多執

C++併發程式設計2——為共享資料加鎖(三)

正交——消除無關事務之間的影響,力求高內聚低耦合。 死鎖的概念略去不說,死鎖有可能發生在使用多個互斥量的場景下,也可能存在沒有使用互斥量的場景: 兩個執行緒都在等待對方釋放互斥量兩個執行緒都呼叫了對方的join()函式 為了解決兩個執行緒都在等待對方釋放互斥量導致的

c++併發程式設計實戰(C++11)pdf 高清

C++併發程式設計實戰PDF高清完整版下載。C++併發程式設計實戰PDF是一本非常熱門的電子圖書。這本書籍是由由威廉姆斯所著的,裡面擁有非常詳細的講解,對於新手來說是本不錯的書。 下載地址:http://download.csdn.net/download/l

C++併發程式設計2——為保護資料加鎖(一)

找到問題的解決辦法,而不是找蹩腳的介面。 在應屆生面試的時候,很多面試官都會問——“多執行緒如何共享資源”。在作業系統層面上可以給出若干關鍵詞答案,但是在語言層面,這個問題考慮的就沒有那麼簡單了。同時,很多人會將多執行緒資料共享和執行緒同步混淆。有關執行緒同步,我們

【 專欄 】- Theron框架——C++併發程式設計

Theron框架——C++併發程式設計庫 基於Actor模型的Theron是近些年發展起來的一個C++併發程式設計框架,它的優勢就是基於Actor模型,以訊息傳遞方式大大減少了目前C++多執行緒開發複雜化,鎖機制等等煩惱。隨著深入

C++併發程式設計

C++11相比之前的版本具有很多優秀的特性,比如lambda表示式,初始化列表,右值引用,自動型別推導。同時,C++11標準庫現在也支援正則表示式、智慧指標、多執行緒庫。 但現代C++在並行和非同步計算方面依然較為薄弱,特別是與C#等語言相比。 非同步的

Python3 與 C# 併發程式設計之~程序先導篇

2.2.殭屍程序和孤兒程序¶ 先看看定義: 孤兒程序 :一個父程序退出,而它的一個或多個子程序還在執行,那麼那些子程序將成為孤兒程序。孤兒程序將被init程序(程序號為1)所收養,並由init程序對它們完成狀態收集工作。 殭屍程序 :一個程序使用fork建立子程序,如果子程序退出,而父程序並沒有

Python3 與 C# 併發程式設計之~ 程序篇

NetCore併發程式設計¶  Python的執行緒、並行、協程下次說 先簡單說下概念(其實之前也有說,所以簡說下): 併發:同時做多件事情 多執行緒:併發的一種形式 並行處理:多執行緒的一種(執行緒池產生的一種併發型別,eg:非同步程式設計) 響應式程式設計:一種程式設計模式,對事件