1. 程式人生 > >Linux下多執行緒程式設計互斥鎖和條件變數的簡單使用

Linux下多執行緒程式設計互斥鎖和條件變數的簡單使用

Linux下的多執行緒遵循POSIX執行緒介面,稱為pthread。編寫Linux下的多執行緒程式,需要使用標頭檔案pthread.h,連結時需要使用庫libpthread.a。執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位。執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器、一組暫存器和棧),但是它可與同屬一個程序的其它的執行緒共享程序所擁有的全部資源。當多個任務可以並行執行時,可以為每個任務啟動一個執行緒。

執行緒是併發執行的。在序列程式基礎上引入執行緒和程序是為了提供程式的併發度,從而提高程式執行效率和響應時間。

與程序相比,執行緒的優勢:(1)、執行緒共享相同的記憶體空間,不同的執行緒可以存取記憶體中的同一個變數;(2)、與標準fork()相比,執行緒帶來的開銷很小,節省了CPU時間,使得執行緒建立比新程序建立快上十到一百倍。

適應多執行緒的理由:(1)、和程序相比,它是一種非常“節儉”的多工操作方式,在Linux系統下,啟動一個新的程序必須分配給它獨立的地址空間,建立眾多的資料表來維護它的程式碼段、堆疊段和資料段,這是一種“昂貴”的多工工作方式。而執行一個程序中的多個執行緒,它們彼此之間使用相同的地址空間,共享大部分資料,啟動一個執行緒花費的空間遠遠小於啟動一個程序所花費的空間,而且,執行緒間彼此切換所需的時間也遠遠小於程序間切換所需要的時間;(2)、執行緒間方便的通訊機制。對不同的程序來說,它們具有獨立的資料空間,要進行資料的傳輸只能通過通訊的方式進行,這種方式不僅費時,而且很不方便。執行緒則不然,同一程序下的執行緒之間共享資料空間,所以一個執行緒的資料可以直接為其它執行緒所用,這不僅快捷,而且方便。

多執行緒程式作為一種多工、併發的工作方式,其優點包括:(1)、提供應用程式響應;(2)、使多CPU系統更加有效:作業系統會保證當執行緒數不大於CPU數目時,不同的執行緒執行在不同的CPU上;(3)、改善程式結構:一個既長又複雜的程序可以考慮分為多個執行緒,成為幾個獨立或半獨立的執行部分,這樣的程式利於理解和修改。

pthread_create:用於在呼叫的程序中建立一個新的執行緒。它有四個引數,第一個引數為指向執行緒識別符號指標;第二個引數用來設定執行緒屬性;第三個引數是執行緒執行函式的起始地址;第四個引數是執行函式的引數。

在一個執行緒中呼叫pthread_create函式建立新的執行緒後,當前執行緒從pthread_create處繼續往下執行。pthread_create函式的第三個引數為新建立執行緒的入口函式的起始地址,此函式接收一個引數,是通過第四個引數傳遞給它的,該引數的型別是void*,這個指標按什麼型別解釋由呼叫者自己定義,入口函式的返回值型別也是void*,這個指標的含義同樣由呼叫者自己定義,入口函式返回時,這個執行緒就退出了,其它執行緒可以呼叫pthread_join函式得到入口函式的返回值。

pthread_join:執行緒阻塞函式,用於阻塞當前的執行緒,直到另外一個執行緒執行結束;使一個執行緒等待另一個執行緒結束;讓主執行緒阻塞在這個地方等待子執行緒結束;程式碼中如果沒有pthread_join主執行緒會很快結束從而使整個程序結束,從而使建立的執行緒沒有機會開始執行就結束了,加入pthread_join後,主執行緒會一直等待直到等待的執行緒結束自己才結束,使建立的執行緒有機會執行。

pthread_create將一個執行緒拆分為兩個,pthread_join()將兩個執行緒合併為一個執行緒。

一個執行緒實際上就是一個函式,建立後,立即被執行,當函式返回時該執行緒也就結束了。

執行緒終止時,一個需要注意的問題是執行緒間的同步問題。一般情況下,程序中各個執行緒的執行是相互獨立的,執行緒的終止並不會相互通知,也不會影響其它執行緒,終止的執行緒所佔用的資源不會隨著執行緒的終止而歸還系統,而是仍然為執行緒所在的程序持有。一個執行緒僅允許一個執行緒使用pthread_join等待它的終止,並且被等待的執行緒應該處於可join狀態,而非DETACHED狀態。一個可”join”的執行緒所佔用的記憶體僅當有執行緒對其執行了pthread_join()後才會釋放,因此為了避免記憶體洩露,所有執行緒終止時,要麼設為DETACHED,要麼使用pthread_join來回收資源。一個執行緒不能被多個執行緒等待。

所有執行緒都有一個執行緒號,也就是threadid,其型別為pthread_t,通過呼叫pthread_self函式可以獲得自身的執行緒號。

Linux執行緒同步的幾種基本方式:join、互斥鎖(mutex)、讀寫鎖(read-writelock)、條件變數(condition variables)。mutex的本質是鎖,而條件變數的本質是等待。

互斥:簡單的理解就是,一個執行緒進入工作區後,如果有其它執行緒想要進入工作區,它就會進入等待狀態,要等待工作區內的執行緒結束後才可以進入。

互斥提供執行緒間資源的獨佔訪問控制。它是一個簡單的鎖,只有持有它的執行緒才可以釋放那個互斥。它確保了它們正在訪問的共享資源的完整性,因為在同一時刻只允許一個執行緒訪問它。

互斥操作,就是對某段程式碼或某個變數修改的時候只能有一個執行緒在執行這段程式碼,其它執行緒不能同時進入這段程式碼或同時修改該變數。這個程式碼或變數稱為臨界資源。

通過鎖機制實現執行緒間的同步,同一時刻只允許一個執行緒執行一個關鍵部分的程式碼。

有兩種方式建立互斥鎖,靜態方式和動態方式。

在預設情況下,Linux下的同一執行緒無法對同一互斥鎖進行遞迴加鎖,否則將發生死鎖。所謂遞迴加鎖,就是在同一執行緒中試圖對互斥鎖進行兩次或兩次以上的行為。解決問題的方法就是顯示地在互斥變數初始化時將其設定成recursive屬性。

互斥量是一種用於多執行緒中的同步訪問的方法,它允許程式鎖住某個物件或者某段程式碼,使得每次只能有一個執行緒訪問它。為了控制對關鍵物件或者程式碼的訪問,必須在進入這段程式碼之前鎖住一個互斥量,然後在完成操作之後解鎖。

互斥量用pthread_mutex_t資料型別來表示,在使用互斥變數之前,必須首先對它進行初始化,可以把它置為常量PTHREAD_MUTEX_INITIALIZER(只對靜態分配的互斥量),也可以通過呼叫pthread_mutex_init函式進行初始化。如果動態地分配互斥量(如呼叫malloc)函式,那麼釋放記憶體前(free)需要使用pthread_mutex_destroy函式。

對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,呼叫執行緒會阻塞,直到互斥量被解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。

pthread_mutex_init函式:主要用於多執行緒中互斥鎖的初始化。如果要用預設的屬性初始化互斥量,只需把第二個引數設定為NULL。互斥量的屬性可以分為四種:(1)、PTHREAD_MUTEX_TIMED_NP,這是預設值,也就是普通鎖,當一個執行緒加鎖以後,其餘請求鎖的執行緒將形成一個等待佇列,並在解鎖後按優先順序獲得鎖,這種鎖策略保證了資源分配的公平性;(2)、PTHREAD_MUTEX_RECURSIVE_NP,巢狀鎖,允許執行緒多次加鎖,不同執行緒,解鎖後重新競爭;(3)、PTHREAD_MUTEX_ERRORCHECK_NP,檢錯,如果該互斥量已經被上鎖,那麼後續的上鎖將會失敗而不會阻塞,否則與PTHREAD_MUTEX_TIMED_NP型別相同,這樣就保證當不允許多次加鎖時不會出現最簡單情況下的死鎖;(4)、PTHREAD_MUTEX_ADAPTIVE_NP,適應鎖,動作最簡單的鎖型別,僅等待解鎖後重新競爭。

pthread_mutex_destroy函式:銷燬(登出)執行緒互斥鎖;銷燬一個互斥鎖即意味著釋放它所佔用的資源,且要求鎖當前處於開放狀態。

pthread_mutex_lock:佔有互斥鎖(阻塞操作);互斥鎖被鎖定,如果這個互斥鎖被一個執行緒鎖定和擁有,那麼另一個執行緒要呼叫這個函式就會進入阻塞狀態(即等待狀態),直到互斥鎖被釋放為止;互斥量一旦被上鎖後,其它執行緒如果想給該互斥量上鎖,那麼就會阻塞在這個操作上,如果在此之前該互斥量已經被其它執行緒上鎖,那麼該操作將會一直阻塞在這個地方,直到獲得該鎖為止。

pthread_mutex_unlock:釋放互斥鎖;在操作完成後,必須呼叫該函式給互斥量解鎖,這樣其它等待該鎖的執行緒才有機會獲得該鎖,否則其它執行緒將會永遠阻塞。

與互斥鎖不同,條件變數是用來等待而不是用來上鎖的。條件變數用來自動阻塞一個執行緒,直到某特殊情況發生為止。通常條件變數和互斥鎖同時使用。條件變數分為兩部分:條件和變數。條件本身是由互斥量保護的。執行緒在改變條件狀態前先要鎖住互斥量。條件變數使我們可以睡眠等待某種條件出現。條件變數是利用執行緒間共享的全域性變數進行同步的一種機制,主要包括兩個動作:一個執行緒等待“條件變數的條件成立”而掛起;另一個執行緒使“條件成立”(給出條件成立訊號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個執行緒自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個執行緒改變了條件,它發訊號給關聯的條件變數,喚醒一個或多個等待它的執行緒,重新獲得互斥鎖,重新評價條件。如果兩執行緒共享可讀寫的記憶體,條件變數可以被用來實現這兩執行緒間的執行緒同步。

互斥鎖一個明顯的缺點是它只有兩種狀態,鎖定和非鎖定。而條件變數通過允許執行緒阻塞和等待另一個執行緒傳送訊號的方法彌補了互斥鎖的不足,它常和互斥鎖一起使用。使用時,條件變數被用來阻塞一個執行緒,當條件不滿足時,執行緒往往解開相應的互斥鎖並等待條件發生變化。一旦其它的某個執行緒改變了條件變數,它將通知相應的條件變數喚醒一個或多個正被此條件變數阻塞的執行緒。這些執行緒將重新鎖定互斥鎖並重新測試條件是否滿足。一般來說,條件變數被用來進行執行緒間的同步。條件變數只是起阻塞和喚醒執行緒的作用,具體的判斷條件還需使用者給出。執行緒被喚醒後,它將重新檢查判斷條件是否滿足,如果還不滿足,一般說來執行緒應該仍阻塞在這裡,被等待被下一次喚醒。這個過程一般用while語句實現。

條件變數用pthread_cond_t結構體來表示。

pthread_cond_init:初始化一個條件變數,當第二個引數屬性為空指標時,函式建立的是一個預設的條件變數,���則條件變數的屬性將由第二個引數的屬性值來決定。不能由多個執行緒同時初始化一個條件變數。當需要重新初始化或釋放一個條件變數時,應用程式必須保證這個條件變數未被使用。

pthread_cond_wait:阻塞在條件變數上,函式將解鎖第二個引數指向的互斥鎖,並使當前執行緒阻塞在第一個引數指向的條件變數上。被阻塞的執行緒可以被pthread_cond_signal、pthread_cond_broadcast函式喚醒,也可能在被訊號中斷後被喚醒。

一般一個條件表示式都是在一個互斥鎖的保護下被檢查。當條件表示式未被滿足時,執行緒將仍然阻塞在這個條件變數上。當另一個執行緒改變了條件的值並向條件變數發出訊號時,等待在這個條件變數上的一個執行緒或所有執行緒被喚醒,接著都試圖再次佔有相應的互斥鎖。阻塞在條件變數上的執行緒被喚醒以後,直到pthread_cond_wait函式返回之前,條件的值都有可能發生變化。所以函式返回以後,在鎖定相應的互斥鎖之前,必須重新測試條件值。最好的測試方法是迴圈呼叫pthread_cond_wait函式,並把滿足條件的表示式置為迴圈的終止條件。阻塞在同一個條件變數上的不同執行緒被釋放的次序是不一定的。

pthread_cond_wait函式是退出點,如果在呼叫這個函式時,已有一個掛起的退出請求,且執行緒允許退出,這個執行緒將被終止並開始執行善後處理函式,而這時和條件變數相關的互斥鎖仍將處在鎖定狀態。

pthread_cond_signal:解除在條件變數上的阻塞。此函式被用來釋放被阻塞在指定條件變數上的一個執行緒。一般在互斥鎖的保護下使用相應的條件變數,否則對條件變數的解鎖有可能發生在鎖定條件變數之前,從而造成死鎖。喚醒阻塞在條件變數上的所有執行緒的順序由排程策略決定。

pthread_cond_timewait:阻塞直到指定時間。函式到了一定的時間,即使條件未發生也會解除阻塞。這個時間由第三個引數指定。

pthread_cond_broadcast:釋放阻塞的所有執行緒。函式喚醒所有被pthread_cond_wait函式阻塞在某個條件變數上的執行緒。當沒有執行緒阻塞在這個條件變數上時,此函式無效。此函式喚醒所有阻塞在某個條件變數上的執行緒,這些執行緒被喚醒後將再次競爭相應的互斥鎖。

pthread_cond_destroy:釋放條件變數。條件變數佔用的空間未被釋放。

pthread_cond_wait和pthread_cond_timewait一定要在mutex的鎖定區域內使用;而pthread_cond_signal和pthread_cond_broadcoast無需考慮呼叫執行緒是否是mutex的擁有者,可以在lock與unlock以外的區域呼叫。

一個特定條件只能有一個互斥物件,而且條件變數應該表示互斥資料“內部”的一種特殊的條件更改。一個互斥物件可以有許多條件變數,但每個條件變數只能有一個互斥物件。

以上所有執行緒相關函式,函式執行成功時返回0,返回其它非0值表示錯誤。

以下是一些測試例子:

1. test_create_thread.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

void* run(void* para)
{
 cout<<"start new thread!"<<endl;
 
 //sleep(5);//suspend 5 s,在正式的程式碼中,一般不要用sleep函式
 int* iptr = (int*)((void**)para)[0];
 float* fptr = (float*)((void**)para)[1];
 char* str = (char*)((void**)para)[2];
 cout << *iptr << "    " << *fptr << "    " << str << endl;

 cout<<"end new thread!"<<endl;
 
 return ((void *)0);
}

int main()
{
 pthread_t pid;//thread handle
 int err = -1;
 int ival = 1;
 float fval = 10.0F;
 char buf[] = "func";
 void* para[3] = { &ival, &fval, buf };

 err = pthread_create(&pid, NULL, run, para);
 if (err != 0) {
  cout << "can't create thread!" << endl;
  return -1;
 }

 //新執行緒建立之後主執行緒如何執行----主執行緒按順序繼續執行下一行程式
 cout << "main thread!" << endl;
 
 //新執行緒結束時如何處理----新執行緒先停止,然後作為其清理過程的一部分,等待與另一個執行緒合併或“連線”
 pthread_join(pid, NULL);

 cout << "ok!" << endl;

 return 0;
}

//終端執行:$ g++ -o test_create_thread test_create_thread.cpp -lpthread
//    $ ./test_create_thread

2. test_thread_mutex.cpp:

#include <pthread.h>
#include <iostream>

using namespace std;

pthread_t tid[2];
int counter = 0;
pthread_mutex_t lock;

void* run(void* arg)
{
 pthread_mutex_lock(&lock);

 unsigned long i = 0;
 counter += 1;
 cout << "Job " << counter << " started!" << endl;
 for (i = 0; i<(0xFFFFFFFF); i++);
 cout << "Job " << counter << " finished!" << endl;

 pthread_mutex_unlock(&lock);

 return NULL;
}

int main()
{
 int i = 0, err = -1;

 if (pthread_mutex_init(&lock, NULL) != 0) {
  cout << "mutex init failed" << endl;
  return -1;
 }

 while (i < 2) {
  err = pthread_create(&(tid[i]), NULL, &run, NULL);
  if (err != 0)
   cout << "can't create thread!" << endl;
  i++;
 }

 pthread_join(tid[0], NULL);
 pthread_join(tid[1], NULL);
 pthread_mutex_destroy(&lock);

 cout << "ok!" << endl;
 return 0;
}

//終端執行:$ g++ -o test_thread_mutex test_thread_mutex.cpp -lpthread
//    $ ./test_thread_mutex

3. test_thread_cond.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t count_lock;
pthread_cond_t count_nonzero;
unsigned count = 0;

void* decrement_count(void* arg)
{
 pthread_mutex_lock(&count_lock);
 cout << "decrement_count get count_lock" << endl;

 while (count == 0) {
  cout << "decrement_count count == 0" << endl;

  cout << "decrement_count before cond_wait" << endl;
  pthread_cond_wait(&count_nonzero, &count_lock);
  cout << "decrement_count after cond_wait" << endl;
 }

 count = count + 1;

 pthread_mutex_unlock(&count_lock);

 return NULL;
}

void* increment_count(void* arg)
{
 pthread_mutex_lock(&count_lock);
 cout << "increment_count get count_lock" << endl;

 if (count == 0) {
  cout << "increment_count before cond_signal" << endl;
  pthread_cond_signal(&count_nonzero);
  cout << "increment_count after cond_signal" << endl;
 }

 count = count + 1;

 pthread_mutex_unlock(&count_lock);

 return NULL;
}

int main()
{
 pthread_t tid1, tid2;

 pthread_mutex_init(&count_lock, NULL);
 pthread_cond_init(&count_nonzero, NULL);

 pthread_create(&tid1, NULL, decrement_count, NULL);
 sleep(2);

 pthread_create(&tid2, NULL, increment_count, NULL);
 sleep(2);

 pthread_join(tid1, NULL);
 pthread_join(tid2, NULL);
 pthread_mutex_destroy(&count_lock);
 pthread_cond_destroy(&count_nonzero);

 cout << "ok!" << endl;
}

//終端執行:$ g++ -o test_thread_cond test_thread_cond.cpp -lpthread
//    $ ./test_thread_cond

4. test_thread_cond1.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t counter_lock;
pthread_cond_t counter_nonzero;
int counter = 0;

void* decrement_counter(void* argv);
void* increment_counter(void* argv);

int main()
{
 cout << "counter: " << counter << endl;
 pthread_mutex_init(&counter_lock, NULL);
 pthread_cond_init(&counter_nonzero, NULL);

 pthread_t thd1, thd2;
 int ret = -1;

 ret = pthread_create(&thd1, NULL, decrement_counter, NULL);
 if (ret){
  cout << "create thread1 fail" << endl;
  return -1;
 }

 ret = pthread_create(&thd2, NULL, increment_counter, NULL);
 if (ret){
  cout << "create thread2 fail" << endl;
  return -1;
 }

 int counter = 0;
 while (counter != 10) {
  cout << "counter(main): " << counter << endl;
  sleep(1);
  counter++;
 }

 pthread_join(thd1, NULL);
 pthread_join(thd2, NULL);
 pthread_mutex_destroy(&counter_lock);
 pthread_cond_destroy(&counter_nonzero);

 cout << "ok!" << endl;
}

void* decrement_counter(void* argv)
{
 cout << "counter(decrement): " << counter << endl;

 pthread_mutex_lock(&counter_lock);
 while (counter == 0)
  pthread_cond_wait(&counter_nonzero, &counter_lock); //進入阻塞(wait),等待啟用(signal)

 cout << "counter--(decrement, before): " << counter << endl;
 counter--; //等待signal啟用後再執行  
 cout << "counter--(decrement, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock);

 return NULL;
}

void* increment_counter(void* argv)
{
 cout << "counter(increment): " << counter << endl;

 pthread_mutex_lock(&counter_lock);
 if (counter == 0)
  pthread_cond_signal(&counter_nonzero); //啟用(signal)阻塞(wait)的執行緒(先執行完signal執行緒,然後再執行wait執行緒) 

 cout << "counter++(increment, before): " << counter << endl;
 counter++;
 cout << "counter++(increment, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock);

 return NULL;
}

//終端執行:$ g++ -o test_thread_cond1 test_thread_cond1.cpp -lpthread
//    $ ./test_thread_cond1

5. test_thread_cond2.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t counter_lock1, counter_lock2;
pthread_cond_t counter_nonzero1, counter_nonzero2;
int counter = 0;

void* decrement_increment_counter(void* argv);

int main()
{
 cout << "counter: " << counter << endl;
 pthread_mutex_init(&counter_lock1, NULL);
 pthread_mutex_init(&counter_lock2, NULL);
 pthread_cond_init(&counter_nonzero1, NULL);
 pthread_cond_init(&counter_nonzero2, NULL);

 pthread_t thd;
 int ret = -1;

 ret = pthread_create(&thd, NULL, decrement_increment_counter, NULL);
 if (ret){
  cout << "create thread1 fail" << endl;
  return -1;
 }

 int counter = 0;
 while (counter != 10) {
  cout << "counter(main): " << counter << endl;
  sleep(1);
  counter++;
 }

 pthread_join(thd, NULL);
 pthread_mutex_destroy(&counter_lock1);
 pthread_mutex_destroy(&counter_lock2);
 pthread_cond_destroy(&counter_nonzero1);
 pthread_cond_destroy(&counter_nonzero2);

 cout << "ok!" << endl;
}

void* decrement_increment_counter(void* argv)
{
 cout << "start counter: " << counter << endl;

 pthread_mutex_lock(&counter_lock1);
 cout << "counter(decrement): " << counter << endl;
 while (counter == 1)
  pthread_cond_wait(&counter_nonzero1, &counter_lock1); //進入阻塞(wait),等待啟用(signal)

 cout << "counter--(decrement, before): " << counter << endl;
 counter--; //等待signal啟用後再執行  
 cout << "counter--(decrement, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock1);

 pthread_mutex_lock(&counter_lock2);
 cout << "counter(increment): " << counter << endl;
 if (counter == 0)
  pthread_cond_signal(&counter_nonzero2); //啟用(signal)阻塞(wait)的執行緒(先執行完signal執行緒,然後再執行wait執行緒) 

 cout << "counter++(increment, before): " << counter << endl;
 counter++;
 cout << "counter++(increment, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock2);

 return NULL;
}

//終端執行:$ g++ -o test_thread_cond2 test_thread_cond2.cpp -lpthread
//    $ ./test_thread_cond2