1. 程式人生 > >boost::thread執行緒管理

boost::thread執行緒管理

雖然多執行緒的使用可以提高應用程式的效能,但也增加了複雜性。 如果使用執行緒在同一時間執行幾個函式,訪問共享資源時必須相應地同步。 一旦應用達到了一定規模,這涉及相當一些工作。 本段介紹了Boost.Thread提供同步執行緒的類。

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    mutex.lock(); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
    mutex.unlock(); 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

多執行緒程式使用所謂的互斥物件來同步。 Boost.Thread提供多個的互斥類,boost::mutex是最簡單的一個。 互斥的基本原則是當一個特定的執行緒擁有資源的時候防止其他執行緒奪取其所有權。 一旦釋放,其他的執行緒可以取得所有權。 這將導致執行緒等待至另一個執行緒完成處理一些操作,從而相應地釋放互斥物件的所有權。

上面的示例使用一個型別為 boost::mutexmutex 全域性互斥物件。thread() 函式獲取此物件的所有權才在 for 迴圈內使用lock() 方法寫入到標準輸出流的。 一旦資訊被寫入,使用 unlock() 方法釋放所有權。

main() 建立兩個執行緒,同時執行 thread ()

函式。 利用 for 迴圈,每個執行緒數到5,用一個迭代器寫一條訊息到標準輸出流。 不幸的是,標準輸出流是一個全域性性的被所有執行緒共享的物件。 該標準不提供任何保證std::cout 可以安全地從多個執行緒訪問。 因此,訪問標準輸出流必須同步:在任何時候,只有一個執行緒可以訪問 std::cout

由於兩個執行緒試圖在寫入標準輸出流前獲得互斥體,實際上只能保證一次只有一個執行緒訪問 std::cout。 不管哪個執行緒成功呼叫 lock() 方法,其他所有執行緒必須等待,直到 unlock() 被呼叫。

獲取和釋放互斥體是一個典型的模式,是由Boost.Thread通過不同的資料型別支援。 例如,不直接地呼叫 lock()

unlock(),使用 boost::lock_guard 類也是可以的。

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    boost::lock_guard<boost::mutex> lock(mutex); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

boost::lock_guard 在其內部構造和解構函式分別自動呼叫 lock()unlock() 。 訪問共享資源是需要同步的,因為它顯示地被兩個方法呼叫。boost::lock_guard 類是另一個出現在 第 2 章 智慧指標 的RAII用語。

除了boost::mutexboost::lock_guard 之外,Boost.Thread也提供其他的類支援各種同步。 其中一個重要的就是boost::unique_lock ,相比較 boost::lock_guard而言,它提供許多有用的方法。

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::timed_mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    boost::unique_lock<boost::timed_mutex> lock(mutex, boost::try_to_lock); 
    if (!lock.owns_lock()) 
      lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1)); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
    boost::timed_mutex *m = lock.release(); 
    m->unlock(); 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

上面的例子用不同的方法來演示 boost::unique_lock 的功能。 當然了,這些功能的用法對給定的情景不一定適用;boost::lock_guard 在上個例子的用法還是挺合理的。 這個例子就是為了演示boost::unique_lock 提供的功能。

boost::unique_lock 通過多個建構函式來提供不同的方式獲得互斥體。 這個期望獲得互斥體的函式簡單地呼叫了lock() 方法,一直等到獲得這個互斥體。 所以它的行為跟 boost::lock_guard 的那個是一樣的。

如果第二個引數傳入一個 boost::try_to_lock 型別的值,對應的建構函式就會呼叫 try_lock() 方法。 這個方法返回 bool 型的值:如果能夠獲得互斥體則返回true,否則返回false 。 相比 lock() 函式,try_lock() 會立即返回,而且在獲得互斥體之前不會被阻塞。

上面的程式向 boost::unique_lock 的建構函式的第二個引數傳入boost::try_to_lock。 然後通過owns_lock() 可以檢查是否可獲得互斥體。 如果不能, owns_lock() 返回 false。 這也用到 boost::unique_lock 提供的另外一個函式: timed_lock() 等待一定的時間以獲得互斥體。 給定的程式等待長達1秒,應較足夠的時間來獲取更多的互斥。

其實這個例子顯示了三個方法獲取一個互斥體:lock() 會一直等待,直到獲得一個互斥體。 try_lock() 則不會等待,但如果它只會在互斥體可用的時候才能獲得,否則返回 false 。 最後,timed_lock() 試圖獲得在一定的時間內獲取互斥體。 和try_lock() 一樣,返回bool 型別的值意味著成功是否。

雖然 boost::mutex 提供了 lock()try_lock() 兩個方法,但是 boost::timed_mutex 只支援timed_lock() ,這就是上面示例那麼使用的原因。 如果不用 timed_lock() 的話,也可以像以前的例子那樣用 boost::mutex

就像 boost::lock_guard 一樣, boost::unique_lock 的解構函式也會相應地釋放互斥量。此外,可以手動地用 unlock() 釋放互斥量。也可以像上面的例子那樣,通過呼叫release() 解除boost::unique_lock 和互斥量之間的關聯。然而在這種情況下,必須顯式地呼叫unlock() 方法來釋放互斥量,因為 boost::unique_lock 的解構函式不再做這件事情。

boost::unique_lock 這個所謂的獨佔鎖意味著一個互斥量同時只能被一個執行緒獲取。 其他執行緒必須等待,直到互斥體再次被釋放。 除了獨佔鎖,還有非獨佔鎖。 Boost.Thread裡有個 boost::shared_lock 的類提供了非獨佔鎖。 正如下面的例子,這個類必須和boost::shared_mutex 型的互斥量一起使用。

#include <boost/thread.hpp> 
#include <iostream> 
#include <vector> 
#include <cstdlib> 
#include <ctime> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::shared_mutex mutex; 
std::vector<int> random_numbers; 

void fill() 
{ 
  std::srand(static_cast<unsigned int>(std::time(0))); 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::shared_mutex> lock(mutex); 
    random_numbers.push_back(std::rand()); 
    lock.unlock(); 
    wait(1); 
  } 
} 

void print() 
{ 
  for (int i = 0; i < 3; ++i) 
  { 
    wait(1); 
    boost::shared_lock<boost::shared_mutex> lock(mutex); 
    std::cout << random_numbers.back() << std::endl; 
  } 
} 

int sum = 0; 

void count() 
{ 
  for (int i = 0; i < 3; ++i) 
  { 
    wait(1); 
    boost::shared_lock<boost::shared_mutex> lock(mutex); 
    sum += random_numbers.back(); 
  } 
} 

int main() 
{ 
  boost::thread t1(fill); 
  boost::thread t2(print); 
  boost::thread t3(count); 
  t1.join(); 
  t2.join(); 
  t3.join(); 
  std::cout << "Sum: " << sum << std::endl; 
} 

boost::shared_lock 型別的非獨佔鎖可以線上程只對某個資源讀訪問的情況下使用。 一個執行緒修改的資源需要寫訪問,因此需要一個獨佔鎖。 這樣做也很明顯:只需要讀訪問的執行緒不需要知道同一時間其他執行緒是否訪問。 因此非獨佔鎖可以共享一個互斥體。

在給定的例子, print()count() 都可以只讀訪問random_numbers 。 雖然 print() 函式把 random_numbers 裡的最後一個數寫到標準輸出,count() 函式把它統計到sum 變數。 由於沒有函式修改 random_numbers,所有的都可以在同一時間用 boost::shared_lock 型別的非獨佔鎖訪問它。

fill() 函式裡,需要用一個 boost::unique_lock 型別的非獨佔鎖,因為它插入了一個新的隨機數到random_numbers。 在 unlock() 顯式地呼叫 unlock() 來釋放互斥量之後, fill() 等待了一秒。 相比於之前的那個樣子, 在for 迴圈的尾部呼叫 wait() 以保證容器裡至少存在一個隨機數,可以被print() 或者count() 訪問。 對應地,這兩個函式在 for 迴圈的開始呼叫了wait()

考慮到在不同的地方每個單獨地呼叫 wait() ,一個潛在的問題變得很明顯:函式呼叫的順序直接受CPU執行每個獨立程序的順序決定。 利用所謂的條件變數,可以同步哪些獨立的執行緒,使陣列的每個元素都被不同的執行緒立即新增到random_numbers

#include <boost/thread.hpp> 
#include <iostream> 
#include <vector> 
#include <cstdlib> 
#include <ctime> 

boost::mutex mutex; 
boost::condition_variable_any cond; 
std::vector<int> random_numbers; 

void fill() 
{ 
  std::srand(static_cast<unsigned int>(std::time(0))); 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::mutex> lock(mutex); 
    random_numbers.push_back(std::rand()); 
    cond.notify_all(); 
    cond.wait(mutex); 
  } 
} 

void print() 
{ 
  std::size_t next_size = 1; 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::mutex> lock(mutex); 
    while (random_numbers.size() != next_size) 
      cond.wait(mutex); 
    std::cout << random_numbers.back() << std::endl; 
    ++next_size; 
    cond.notify_all(); 
  } 
} 

int main() 
{ 
  boost::thread t1(fill); 
  boost::thread t2(print); 
  t1.join(); 
  t2.join(); 
} 

這個例子的程式刪除了 wait()count() 。執行緒不用在每個迴圈迭代中等待一秒,而是儘可能快地執行。此外,沒有計算總額;數字完全寫入標準輸出流。

為確保正確地處理隨機數,需要一個允許檢查多個執行緒之間特定條件的條件變數來同步不每個獨立的執行緒。

正如上面所說, fill() 函式用在每個迭代產生一個隨機數,然後放在 random_numbers 容器中。 為了防止其他執行緒同時訪問這個容器,就要相應得使用一個排它鎖。 不是等待一秒,實際上這個例子卻用了一個條件變數。 呼叫notify_all() 會喚醒每個哪些正在分別通過呼叫wait() 等待此通知的執行緒。

通過檢視 print() 函式裡的 for 迴圈,可以看到相同的條件變數被wait() 函式呼叫了。 如果這個執行緒被 notify_all() 喚醒,它就會試圖這個互斥量,但只有在fill() 函式完全釋放之後才能成功。

這裡的竅門就是呼叫 wait() 會釋放相應的被引數傳入的互斥量。 在呼叫 notify_all()後, fill() 函式會通過 wait() 相應地釋放執行緒。 然後它會阻止和等待其他的執行緒呼叫 notify_all() ,一旦隨機數已寫入標準輸出流,這就會在print() 裡發生。

注意到在 print() 函式裡呼叫 wait() 事實上發生在一個單獨while 迴圈裡。 這樣做的目的是為了處理在 print() 函式裡第一次呼叫wait() 函式之前隨機數已經放到容器裡。 通過比較 random_numbers 裡元素的數目與預期值,發現這成功地處理了把隨機數寫入到標準輸出流。