1. 程式人生 > >使用C++編寫linux多執行緒程式

使用C++編寫linux多執行緒程式

前言

在這個多核時代,如何充分利用每個 CPU 核心是一個繞不開的話題,從需要為成千上萬的使用者同時提供服務的服務端應用程式,到需要同時開啟十幾個頁面,每個頁面都有幾十上百個連結的 web 瀏覽器應用程式,從保持著幾 t 甚或幾 p 的資料的資料庫系統,到手機上的一個有良好使用者響應能力的 app,為了充分利用每個 CPU 核心,都會想到是否可以使用多執行緒技術。這裡所說的“充分利用”包含了兩個層面的意思,一個是使用到所有的核心,再一個是核心不空閒,不讓某個核心長時間處於空閒狀態。在 C++98 的時代,C++標準並沒有包含多執行緒的支援,人們只能直接呼叫作業系統提供的 SDK API 來編寫多執行緒程式,不同的作業系統提供的 SDK API 以及執行緒控制能力不盡相同,到了 C++11,終於在標準之中加入了正式的多執行緒的支援,從而我們可以使用標準形式的類來建立與執行執行緒,也使得我們可以使用標準形式的鎖、原子操作、執行緒本地儲存 (TLS) 等來進行復雜的各種模式的多執行緒程式設計,而且,C++11 還提供了一些高階概念,比如 promise/future,packaged_task,async 等以簡化某些模式的多執行緒程式設計。

多執行緒可以讓我們的應用程式擁有更加出色的效能,同時,如果沒有用好,多執行緒又是比較容易出錯的且難以查詢錯誤所在,甚至可以讓人們覺得自己陷進了泥潭,希望本文能夠幫助您更好地使用 C++11 來進行 Linux 下的多執行緒程式設計。

認識多執行緒

首先我們應該正確地認識執行緒。維基百科對執行緒的定義是:執行緒是一個編排好的指令序列,這個指令序列(執行緒)可以和其它的指令序列(執行緒)並行執行,作業系統排程器將執行緒作為最小的 CPU 排程單元。在進行架構設計時,我們應該多從作業系統執行緒排程的角度去考慮應用程式的執行緒安排,而不僅僅是程式碼。

當只有一個 CPU 核心可供排程時,多個執行緒的執行示意如下:

圖 1、單個 CPU 核心上的多個執行緒執行示意圖

我們可以看到,這時的多執行緒本質上是單個 CPU 的時間分片,一個時間片執行一個執行緒的程式碼,它可以支援併發處理,但是不能說是真正的平行計算。

當有多個 CPU 或者多個核心可供排程時,可以做到真正的平行計算,多個執行緒的執行示意如下:

圖 2、雙核 CPU 上的多個執行緒執行示意圖

從上述兩圖,我們可以直接得到使用多執行緒的一些常見場景:

  • 程序中的某個執行緒執行了一個阻塞操作時,其它執行緒可以依然執行,比如,等待使用者輸入或者等待網路資料包的時候處理啟動後臺執行緒處理業務,或者在一個遊戲引擎中,一個執行緒等待使用者的互動動作輸入,另外一個執行緒在後臺合成下一幀要畫的影象或者播放背景音樂等。
  • 將某個任務分解為小的可以並行進行的子任務,讓這些子任務在不同的 CPU 或者核心上同時進行計算,然後彙總結果,比如歸併排序,或者分段查詢,這樣子來提高任務的執行速度。

需要注意一點,因為單個 CPU 核心下多個執行緒並不是真正的並行,有些問題,比如 CPU 快取不一致問題,不一定能表現出來,一旦這些程式碼被放到了多核或者多 CPU 的環境執行,就很可能會出現“在開發測試環境一切沒有問題,到了實施現場就莫名其妙”的情況,所以,在進行多執行緒開發時,開發與測試環境應該是多核或者多 CPU 的,以避免出現這類情況。

 

C++11 的執行緒類 std::thread

C++11 的標準類 std::thread 對執行緒進行了封裝,它的宣告放在標頭檔案 thread 中,其中聲明瞭執行緒類 thread, 執行緒識別符號 id,以及名字空間 this_thread,按照 C++11 規範,這個標頭檔案至少應該相容如下內容:

清單 1.例子 thread 標頭檔案主要內容
namespace std{
 struct thread{
 // native_handle_type 是連線 thread 類和作業系統 SDK API 之間的橋樑。
 typedef implementation-dependent native_handle_type;
 native_handle_type native_handle();
 //
 struct id{
 id() noexcept;
 // 可以由==, < 兩個運算衍生出其它大小關係運算。
 bool operator==(thread::id x, thread::id y) noexcept;
 bool operator<(thread::id x, thread::id y) noexcept;
 template<class charT, class traits>
 basic_ostream<charT, traits>&
 operator<<(basic_ostream<charT, traits>&out, thread::id id);
 // 雜湊函式
 template <class T> struct hash;
 template <> struct hash<thread::id>;
 };
 id get_id() const noexcept;
 // 構造與析構
 thread() noexcept;
 template<class F, class… Args> explicit thread(F&f, Args&&… args);
 ~thread();
 thread(const thread&) = delete;
 thread(thread&&) noexcept;
 thread& operator=( const thread&) = delete;
 thread& operator=(thread&&) noexcept;
 //
 void swap(thread&) noexcept;
 bool joinable() const noexcept;
 void join();
 void detach();
 // 獲取物理執行緒數目
 static unsigned hardware_concurrency() noexcept;
 }
 namespace this_thead{
 thread::id get_id();
 void yield();
 template<class Clock, class Duration>
 void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);
 template<class Rep, class Period>
 void sleep_for(const chromo::duration<Rep, Period>& rel_time);
 }
}

和有些語言中定義的執行緒不同,C++11 所定義的執行緒是和操作系的執行緒是一一對應的,也就是說我們生成的執行緒都是直接接受作業系統的排程的,通過作業系統的相關命令(比如 ps -M 命令)是可以看到的,一個程序所能建立的執行緒數目以及一個作業系統所能建立的總的執行緒數目等都由執行時作業系統限定。

native_handle_type 是連線 thread 類和作業系統 SDK API 之間的橋樑,在 g++(libstdc++) for Linux 裡面,native_handle_type 其實就是 pthread 裡面的 pthread_t 型別,當 thread 類的功能不能滿足我們的要求的時候(比如改變某個執行緒的優先順序),可以通過 thread 類例項的 native_handle() 返回值作為引數來呼叫相關的 pthread 函式達到目的。thread::id 定義了在執行時作業系統內唯一能夠標識該執行緒的識別符號,同時其值還能指示所標識的執行緒的狀態,其預設值 (thread::id()) 表示不存在可控的正在執行的執行緒(即空執行緒,比如,呼叫 thead() 生成的沒有指定入口函式的執行緒類例項),當一個執行緒類例項的 get_id() 等於預設值的時候,即 get_id() == thread::id(),表示這個執行緒類例項處於下述狀態之一:

  • 尚未指定執行的任務
  • 執行緒執行完畢
  • 執行緒已經被轉移 (move) 到另外一個執行緒類例項
  • 執行緒已經被分離 (detached)

空執行緒 id 字串表示形式依具體實現而定,有些編譯器為 0x0,有些為一句語義解釋。

有時候我們需要線上程執行程式碼裡面對當前呼叫者執行緒進行操作,針對這種情況,C++11 裡面專門定義了一個名字空間 this_thread,其中包括 get_id() 函式可用來獲取當前呼叫者執行緒的 id,yield() 函式可以用來將呼叫者執行緒跳出執行狀態,重新交給作業系統進行排程,sleep_until 和 sleep_for 函式則可以讓呼叫者執行緒休眠若干時間。get_id() 函式實際上是通過呼叫 pthread_self() 函式獲得呼叫者執行緒的識別符號,而 yield() 函式則是通過呼叫作業系統 API sched_yield() 進行排程切換。

如何建立和結束一個執行緒

和 pthread_create 不同,使用 thread 類建立執行緒可以使用一個函式作為入口,也可以是其它的 Callable 物件,而且,可以給入口傳入任意個數任意型別的引數:

清單 2.例子 thread_run_func_var_args.cc
int funcReturnInt(const char* fmt, ...){
 va_list ap;
 va_start(ap, fmt);
 vprintf( fmt, ap );
 va_end(ap);
 return 0xabcd;
}
void threadRunFunction(void){
 thread* t = new thread(funcReturnInt, "%d%s\n", 100, "\%");
 t->join();
 delete t;
}

一個類的成員函式也可以作為執行緒入口:

清單 4.例子 thread_run_member_func.cc
struct God{
 void create(const char* anything){
 cout << "create " << anything << endl;
 }
};
void threadRunMemberFunction(void){
 God god;
 thread* t = new thread( &God::create, god, "the world" );
 t->join();
 delete t;
}

雖然 thread 類的初始化可以提供這麼豐富和方便的形式,其實現的底層依然是建立一個 pthread 執行緒並執行之,有些實現甚至是直接呼叫 pthread_create 來建立。

建立一個執行緒之後,我們還需要考慮一個問題:該如何處理這個執行緒的結束?一種方式是等待這個執行緒結束,在一個合適的地方呼叫 thread 例項的 join() 方法,呼叫者執行緒將會一直等待著目標執行緒的結束,當目標執行緒結束之後呼叫者執行緒繼續執行;另一個方式是將這個執行緒分離,由其自己結束,通過呼叫 thread 例項的 detach() 方法將目標執行緒置於分離模式。一個執行緒的 join() 方法與 detach() 方法只能呼叫一次,不能在呼叫了 join() 之後又呼叫 detach(),也不能在呼叫 detach() 之後又呼叫 join(),在呼叫了 join() 或者 detach() 之後,該執行緒的 id 即被置為預設值(空執行緒),表示不能繼續再對該執行緒作修改變化。如果沒有呼叫 join() 或者 detach(),那麼,在析構的時候,該執行緒例項將會呼叫 std::terminate(),這會導致整個程序退出,所以,如果沒有特別需要,一般都建議在生成子執行緒後呼叫其 join() 方法等待其退出,這樣子最起碼知道這些子執行緒在什麼時候已經確保結束。

在 C++11 裡面沒有提供 kill 掉某個執行緒的能力,只能被動地等待某個執行緒的自然結束,如果我們要主動停止某個執行緒的話,可以通過呼叫 Linux 作業系統提供的 pthread_kill 函式給目標執行緒傳送訊號來實現,示例如下:

清單 5.例子 thread_kill.cc
static void on_signal_term(int sig){
 cout << "on SIGTERM:" << this_thread::get_id() << endl;
 pthread_exit(NULL); 
}
void threadPosixKill(void){
 signal(SIGTERM, on_signal_term);
 thread* t = new thread( [](){
 while(true){
 ++counter;
 }
 });
 pthread_t tid = t->native_handle();
 cout << "tid=" << tid << endl;
 // 確保子執行緒已經在執行。
 this_thread::sleep_for( chrono::seconds(1) );
 pthread_kill(tid, SIGTERM);
 t->join();
 delete t;
 cout << "thread destroyed." << endl;
}

 

上述例子還可以用來給某個執行緒傳送其它訊號,具體的 pthread_exit 函式呼叫的約定依賴於具體的作業系統的實現,所以,這個方法是依賴於具體的作業系統的,而且,因為在 C++11 裡面沒有這方面的具體約定,用這種方式也是依賴於 C++編譯器的具體實現的。

執行緒類 std::thread 的其它方法和特點

thread 類是一個特殊的類,它不能被拷貝,只能被轉移或者互換,這是符合執行緒的語義的,不要忘記這裡所說的執行緒是直接被作業系統排程的。執行緒的轉移使用 move 函式,示例如下:

清單 6.例子 thread_move.cc
void threadMove(void){
 int a = 1;
 thread t( [](int* pa){
 for(;;){
 *pa = (*pa * 33) % 0x7fffffff;
 if ( ( (*pa) >> 30) & 1) break;
 }
 }, &a);
 thread t2 = move(t);	// 改為 t2 = t 將不能編譯。
 t2.join();
 cout << "a=" << a << endl;
}

在這個例子中,如果將 t2.join() 改為 t.join() 將會導致整個程序被結束,因為忘記了呼叫 t2 也就是被轉移的執行緒的 join() 方法,從而導致整個程序被結束,而 t 則因為已經被轉移,其 id 已被置空。

執行緒例項互換使用 swap 函式,示例如下:

清單 7.例子 thread_swap.cc
void threadSwap(void){
 int a = 1;
 thread t( [](int* pa){
 for(;;){
 *pa = (*pa * 33) % 0x7fffffff;
 if ( ( (*pa) >> 30) & 1) break;
 }
 }, &a);
 thread t2;
 cout << "before swap: t=" << t.get_id() 
 << ", t2=" << t2.get_id() << endl;
 swap(t, t2);
 cout << "after swap : t=" << t.get_id() 
 << ", t2=" << t2.get_id() << endl;
 t2.join();
 cout << "a=" << a << endl;
}

互換和轉移很類似,但是互換僅僅進行例項(以 id 作標識)的互換,而轉移則在進行例項標識的互換之前,還進行了轉移目的例項(如下例的t2)的清理,如果 t2 是可聚合的(joinable() 方法返回 true),則呼叫 std::terminate(),這會導致整個程序退出,比如下面這個例子:

清單 8.例子 thread_move_term.cc
void threadMoveTerm(void){
 int a = 1;
 thread t( [](int* pa){
 for(;;){
 *pa = (*pa * 33) % 0x7fffffff;
 if ( ( (*pa) >> 30) & 1) break;
 }
 }, &a);
 thread t2( [](){
 int i = 0;
 for(;;)i++;
 } );
 t2 = move(t);	// 將會導致 std::terminate()
 cout << "should not reach here" << endl;
 t2.join();
}

所以,在進行執行緒例項轉移的時候,要注意判斷目的例項的 id 是否為空值(即 id())。

如果我們繼承了 thread 類,則還需要禁止拷貝建構函式、拷貝賦值函式以及賦值操作符過載函式等,另外,thread 類的解構函式並不是虛解構函式。示例如下:

清單 9.例子 thread_inherit.cc
class MyThread : public thread{
public:
 MyThread() noexcept : thread(){};
 template<typename Callable, typename... Args>
 explicit
 MyThread(Callable&& func, Args&&... args) : 
 thread( std::forward<Callable>(func), 
 std::forward<Args>(args)...){
 }
 ~MyThread() { thread::~thread(); }
 // disable copy constructors
 MyThread( MyThread& ) = delete;
 MyThread( const MyThread& ) = delete;
 MyThread& operator=(const MyThread&) = delete;
};

 

因為 thread 類的解構函式不是虛解構函式,在上例中,需要避免出現下面這種情況:

MyThread* tc = new MyThread(...);

...

thread* tp = tc;

...

delete tp;

這種情況會導致 MyThread 的解構函式沒有被呼叫。

執行緒的排程

我們可以呼叫 this_thread::yield() 將當前呼叫者執行緒切換到重新等待排程,但是不能對非呼叫者執行緒進行排程切換,也不能讓非呼叫者執行緒休眠(這是作業系統排程器乾的活)。

清單 10.例子 thread_yield.cc

 

void threadYield(void){
 unsigned int procs = thread::hardware_concurrency(), // 獲取物理執行緒數目
 i = 0;
 thread* ta = new thread( [](){
 struct timeval t1, t2;
 gettimeofday(&t1, NULL);
 for(int i = 0, m = 13; i < COUNT; i++, m *= 17){
 this_thread::yield();
 }
 gettimeofday(&t2, NULL);
 print_time(t1, t2, " with yield");
 } );
 thread** tb = new thread*[ procs ];
 for( i = 0; i < procs; i++){
 tb[i] = new thread( [](){
 struct timeval t1, t2;
 gettimeofday(&t1, NULL);
 for(int i = 0, m = 13; i < COUNT; i++, m *= 17){
 do_nothing();
 }
 gettimeofday(&t2, NULL);
 print_time(t1, t2, "without yield");
 });
 }
 ta->join();
 delete ta;
 for( i = 0; i < procs; i++){
 tb[i]->join();
 delete tb[i];
 };
 delete tb;
}

ta 執行緒因為需要經常切換去重新等待排程,它執行的時間要比 tb 要多,比如在作者的機器上執行得到如下結果:

$time ./a.out
without yield elapse 0.050199s
without yield elapse 0.051042s
without yield elapse 0.05139s
without yield elapse 0.048782s
 with yield elapse 1.63366s
real	0m1.643s
user	0m1.175s
sys	0m0.611s

ta 執行緒即使扣除系統呼叫執行時間 0.611s 之後,它的執行時間也遠大於沒有進行切換的執行緒。

C++11 沒有提供調整執行緒的排程策略或者優先順序的能力,如果需要,只能通過呼叫相關的 pthread 函式來進行,需要的時候,可以通過呼叫 thread 類例項的 native_handle() 方法或者作業系統 API pthread_self() 來獲得 pthread 執行緒 id,作為 pthread 函式的引數。

執行緒間的資料互動和資料爭用 (Data Racing)

同一個程序內的多個執行緒之間多是免不了要有資料互相來往的,佇列和共享資料是實現多個執行緒之間的資料互動的常用方式,封裝好的佇列使用起來相對來說不容易出錯一些,而共享資料則是最基本的也是較容易出錯的,因為它會產生資料爭用的情況,即有超過一個執行緒試圖同時搶佔某個資源,比如對某塊記憶體進行讀寫等,如下例所示:

清單 11.例子 thread_data_race.cc
static void
inc(int *p ){
 for(int i = 0; i < COUNT; i++){
 (*p)++;
 }
}
void threadDataRacing(void){
 int a = 0;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

 

這是簡化了的極端情況,我們可以一眼看出來這是兩個執行緒在同時對&a 這個記憶體地址進行寫操作,但是在實際工作中,在程式碼的海洋中發現它並不一定容易。從表面看,兩個執行緒執行完之後,最後的 a 值應該是 COUNT * 2,但是實際上並非如此,因為簡單如 (*p)++這樣的操作並不是一個原子動作,要解決這個問題,對於簡單的基本型別資料如字元、整型、指標等,C++提供了原子模版類 atomic,而對於複雜的物件,則提供了最常用的鎖機制,比如互斥類 mutex,門鎖 lock_guard,唯一鎖 unique_lock,條件變數 condition_variable 等。

現在我們使用原子模版類 atomic 改造上述例子得到預期結果:

清單 12.例子 thread_atomic.cc
static void
inc(atomic<int> *p ){
 for(int i = 0; i < COUNT; i++){
 (*p)++;
 }
}
void threadDataRacing(void){
 atomic<int> a(0) ;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

我們也可以使用 lock_guard,lock_guard 是一個範圍鎖,本質是 RAII(Resource Acquire Is Initialization),在構建的時候自動加鎖,在析構的時候自動解鎖,這保證了每一次加鎖都會得到解鎖。即使是呼叫函式發生了異常,在清理棧幀的時候也會呼叫它的解構函式得到解鎖,從而保證每次加鎖都會解鎖,但是我們不能手工呼叫加鎖方法或者解鎖方法來進行更加精細的資源佔用管理,使用 lock_guard 示例如下:

清單 13.例子 thread_lock_guard.cc
static mutex g_mutex;
static void
inc(int *p ){
 for(int i = 0; i < COUNT; i++){
 lock_guard<mutex> _(g_mutex);
 (*p)++;
 }
}
void threadLockGuard(void){
 int a = 0;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

如果要支援手工加鎖,可以考慮使用 unique_lock 或者直接使用 mutex。unique_lock 也支援 RAII,它也可以一次性將多個鎖加鎖;如果使用 mutex 則直接呼叫 mutex 類的 lock, unlock, trylock 等方法進行更加精細的鎖管理:

清單 14.例子 thread_mutex.cc
static mutex g_mutex;
static void
inc(int *p ){
 thread_local int i; // TLS 變數
 for(; i < COUNT; i++){
 g_mutex.lock();
 (*p)++;
 g_mutex.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

在上例中,我們還使用了執行緒本地儲存 (TLS) 變數,我們只需要在變數前面宣告它是 thread_local 即可。TLS 變數線上程棧內分配,執行緒棧只有在執行緒建立之後才生效,線上程退出的時候銷燬,需要注意不同系統的執行緒棧的大小是不同的,如果 TLS 變數佔用空間比較大,需要注意這個問題。TLS 變數一般不能跨執行緒,其初始化在呼叫執行緒第一次使用這個變數時進行,預設初始化為 0。

對於執行緒間的事件通知,C++11 提供了條件變數類 condition_variable,可視為 pthread_cond_t 的封裝,使用條件變數可以讓一個執行緒等待其它執行緒的通知 (wait,wait_for,wait_until),也可以給其它執行緒傳送通知 (notify_one,notify_all),條件變數必須和鎖配合使用,在等待時因為有解鎖和重新加鎖,所以,在等待時必須使用可以手工解鎖和加鎖的鎖,比如 unique_lock,而不能使用 lock_guard,示例如下:

清單 15.例子 thread_cond_var.cc
#include <thread>
#include <iostream>
#include <condition_variable>
using namespace std;
mutex m;
condition_variable cv;
void threadCondVar(void){
# define THREAD_COUNT 10
 thread** t = new thread*[THREAD_COUNT];
 int i;
 for(i = 0; i < THREAD_COUNT; i++){
 t[i] = new thread( [](int index){
 unique_lock<mutex> lck(m);
 cv.wait_for(lck, chrono::hours(1000));
 cout << index << endl;
 }, i );
 this_thread::sleep_for( chrono::milliseconds(50));
 }
 for(i = 0; i < THREAD_COUNT; i++){
 lock_guard<mutex> _(m);
 cv.notify_one();
 }
 for(i = 0; i < THREAD_COUNT; i++){
 t[i]->join();
 delete t[i];
 }
 delete t;
}

從上例的執行結果也可以看到,條件變數是不保證次序的,即首先呼叫 wait 的不一定首先被喚醒。  

幾個高階概念

C++11 提供了若干多執行緒程式設計的高階概念:promise/future, packaged_task, async,來簡化多執行緒程式設計,尤其是執行緒之間的資料互動比較簡單的情況下,讓我們可以將注意力更多地放在業務處理上。

promise/future 可以用來線上程之間進行簡單的資料互動,而不需要考慮鎖的問題,執行緒 A 將資料儲存在一個 promise 變數中,另外一個執行緒 B 可以通過這個 promise 變數的 get_future() 獲取其值,當執行緒 A 尚未在 promise 變數中賦值時,執行緒 B 也可以等待這個 promise 變數的賦值:

清單 16.例子 thread_promise_future.cc
promise<string> val;
static void
threadPromiseFuture(){
 thread ta([](){
 future<string> fu = val.get_future();
 cout << "waiting promise->future" << endl;
 cout << fu.get() << endl;
 });
 thread tb([](){
 this_thread::sleep_for( chrono::milliseconds(100) );
 val.set_value("promise is set");
 });
 ta.join();
 tb.join();
}

一個 future 變數只能呼叫一次 get(),如果需要多次呼叫 get(),可以使用 shared_future,通過 promise/future 還可以線上程之間傳遞異常。

如果將一個 callable 物件和一個 promise 組合,那就是 packaged_task,它可以進一步簡化操作:

清單 17.例子 thread_packaged_task.cc
static mutex g_mutex;
static void
threadPackagedTask(){
 auto run = [=](int index){ 
 {
 lock_guard<mutex> _(g_mutex);
 cout << "tasklet " << index << endl;
 }
 this_thread::sleep_for( chrono::seconds(10) );
 return index * 1000;
 };
 packaged_task<int(int)> pt1(run);
 packaged_task<int(int)> pt2(run);
 thread t1([&](){pt1(2);} );
 thread t2([&](){pt2(3);} );
 int f1 = pt1.get_future().get();
 int f2 = pt2.get_future().get();
 cout << "task result=" << f1 << endl;
 cout << "task result=" << f2 << endl;
 t1.join();
 t2.join();
}

我們還可以試圖將一個 packaged_task 和一個執行緒組合,那就是 async() 函式。使用 async() 函式啟動執行程式碼,返回一個 future 物件來儲存程式碼返回值,不需要我們顯式地建立和銷燬執行緒等,而是由 C++11 庫的實現決定何時建立和銷燬執行緒,以及建立幾個執行緒等,示例如下:

清單 18.例子 thread_async.cc
static long
do_sum(vector<long> *arr, size_t start, size_t count){
 static mutex _m;
 long sum = 0;
 for(size_t i = 0; i < count; i++){
 sum += (*arr)[start + i];
 }
 {
 lock_guard<mutex> _(_m);
 cout << "thread " << this_thread::get_id() 
 << ", count=" << count
 << ", sum=" << sum << endl;
 }
 return sum;
}
static void
threadAsync(){
# define COUNT 1000000
 vector<long> data(COUNT);
 for(size_t i = 0; i < COUNT; i++){
 data[i] = random() & 0xff;
 }
 //
 vector< future<long> > result;
 size_t ptc = thread::hardware_concurrency() * 2;
 for(size_t batch = 0; batch < ptc; batch++){
 size_t batch_each = COUNT / ptc;
 if (batch == ptc - 1){
 batch_each = COUNT - (COUNT / ptc * batch);
 }
 result.push_back(async(do_sum, &data, batch * batch_each, batch_each));
 }
 long total = 0;
 for(size_t batch = 0; batch < ptc; batch++){
 total += result[batch].get();
 }
 cout << "total=" << total << endl;
}

  如果是在多核或者多 CPU 的環境上面執行上述例子,仔細觀察輸出結果,可能會發現有些執行緒 ID 是重複的,這說明重複使用了執行緒,也就是說,通過使用 async() 還可達到一些執行緒池的功能。

幾個需要注意的地方

thread 同時也是棉線、毛線、絲線等意思,我想大家都能體會面對一團亂麻不知從何處查詢頭緒的感受,不要忘了,執行緒不是靜態的,它是不斷變化的,請想像一下面對一團會動態變化的亂麻的情景。所以,使用多執行緒技術的首要準則是我們自己要十分清楚我們的執行緒在哪裡?線頭(執行緒入口和出口)在哪裡?先安排好執行緒的執行,注意不同執行緒的交叉點(訪問或者修改同一個資源,包括記憶體、I/O 裝置等),儘量減少執行緒的交叉點,要知道幾條線堆在一起最怕的是互相打結。

當我們的確需要不同執行緒訪問一個共同的資源時,一般都需要進行加鎖保護,否則很可能會出現資料不一致的情況,從而出現各種時現時不現的莫名其妙的問題,加鎖保護時有幾個問題需要特別注意:一是一個執行緒內連續多次呼叫非遞迴鎖 (non-recursive lock) 的加鎖動作,這很可能會導致異常;二是加鎖的粒度;三是出現死鎖 (deadlock),多個執行緒互相等待對方釋放鎖導致這些執行緒全部處於罷工狀態。

第一個問題只要根據場景呼叫合適的鎖即可,當我們可能會在某個執行緒內重複呼叫某個鎖的加鎖動作時,我們應該使用遞迴鎖 (recursive lock),在 C++11 中,可以根據需要來使用 recursive_mutex,或者 recursive_timed_mutex。

第二個問題,即鎖的粒度,原則上應該是粒度越小越好,那意味著阻塞的時間越少,效率更高,比如一個數據庫,給一個數據行 (data row) 加鎖當然比給一個表 (table) 加鎖要高效,但是同時複雜度也會越大,越容易出錯,比如死鎖等。

對於第三個問題我們需要先看下出現死鎖的條件:

  1. 資源互斥,某個資源在某一時刻只能被一個執行緒持有 (hold);
  2. 吃著碗裡的還看著鍋裡的,持有一個以上的互斥資源的執行緒在等待被其它程序持有的互斥資源;
  3. 不可搶佔,只有在某互斥資源的持有執行緒釋放了該資源之後,其它執行緒才能去持有該資源;
  4. 環形等待,有兩個或者兩個以上的執行緒各自持有某些互斥資源,並且各自在等待其它執行緒所持有的互斥資源。

我們只要不讓上述四個條件中的任意一個不成立即可。在設計的時候,非常有必要先分析一下會否出現滿足四個條件的情況,特別是檢查有無試圖去同時保持兩個或者兩個以上的鎖,當我們發現試圖去同時保持兩個或者兩個以上的鎖的時候,就需要特別警惕了。下面我們來看一個簡化了的死鎖的例子:

清單 19.例子 thread_deadlock.cc
static mutex g_mutex1, g_mutex2;
static void
inc1(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex1.lock();
 (*p)++;
 g_mutex2.lock();
 // do something.
 g_mutex2.unlock();
 g_mutex1.unlock();
 }
}
static void
inc2(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex2.lock();
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 // do other thing.
 g_mutex2.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc1, &a);
 thread tb( inc2, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

在這個例子中,g_mutex1 和 g_mutex2 都是互斥的資源,任意時刻都只有一個執行緒可以持有(加鎖成功),而且只有持有執行緒呼叫 unlock 釋放鎖資源的時候其它執行緒才能去持有,滿足條件 1 和 3,執行緒 ta 持有了 g_mutex1 之後,在釋放 g_mutex1 之前試圖去持有 g_mutex2,而執行緒 tb 持有了 g_mutex2 之後,在釋放 g_mutex2 之前試圖去持有 g_mutex1,滿足條件 2 和 4,這種情況之下,當執行緒 ta 試圖去持有 g_mutex2 的時候,如果 tb 正持有 g_mutex2 而試圖去持有 g_mutex1 時就發生了死鎖。在有些環境下,可能要多次執行這個例子才出現死鎖,實際工作中這種偶現特性讓查詢問題變難。要破除這個死鎖,我們只要按如下程式碼所示破除條件 3 和 4 即可:

清單 20.例子 thread_break_deadlock.cc
static mutex g_mutex1, g_mutex2;
static voi
inc1(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 g_mutex2.lock();
 // do something.
 g_mutex2.unlock();
 }
}
static void
inc2(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex2.lock();
 // do other thing.
 g_mutex2.unlock();
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc1, &a);
 thread tb( inc2, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

在一些複雜的並行程式設計場景,如何避免死鎖是一個很重要的話題,在實踐中,當我們看到有兩個鎖巢狀加鎖的時候就要特別提高警惕,它極有可能滿足了條件 2 或者 4。  

結束語

上述例子在 CentOS 6.5,g++ 4.8.1/g++4.9 以及 clang 3.5 下面編譯通過,在編譯的時候,請注意下述幾點:

  • 設定 -std=c++11;
  • 連結的時候設定 -pthread;
  • 使用 g++編譯連結時設定 -Wl,--no-as-needed 傳給連結器,有些版本的 g++需要這個設定;
  • 設定巨集定義 -D_REENTRANT,有些庫函式是依賴於這個巨集定義來確定是否使用多執行緒版本的。

具體可以參考本文所附的程式碼中的 Makefile 檔案。

在用 gdb 除錯多執行緒程式的時候,可以輸入命令 info threads 檢視當前的執行緒列表,通過命令 thread n 切換到第 n 個執行緒的上下文,這裡的 n 是 info threads 命令輸出的執行緒索引數字,例如,如果要切換到第 2 個執行緒的上下文,則輸入命令 thread 2。