1. 程式人生 > >一步一步學linux多執行緒程式設計

一步一步學linux多執行緒程式設計

windows下的多執行緒已經很熟悉了,本以為遷移到linux很容易,但總是感覺心裡沒底,函式名都不一樣。所以現在把linux下的多執行緒完整的走一遍。

1、建立執行緒

windows下用_beginthreadex來開啟一個執行緒,那麼linux呢? 那就用pthread_create。相比之下,pthread_create引數更少,引數是pid, attr, startaddr, param。

標頭檔案

#include<pthread.h>

函式宣告
int pthread_create(pthread_t *tidp,const pthread_attr_t *attr,(void*)(*start_rtn)(void*),void *arg);

編譯連結引數
-pthread

返回值


若執行緒建立成功,則返回0。若執行緒建立失敗,則返回出錯編號,並且*thread中的內容是未定義的。返回成功時,由tidp指向的記憶體單元被設定為新建立執行緒的執行緒ID。attr引數

用於指定各種不同的執行緒屬性。新建立的執行緒從start_rtn函式的地址開始執行,該函式只有一個萬能指標引數arg,如果需要向start_rtn函式傳遞的引數不止一個,那麼需要把這

些引數放到一個結構中,然後把這個結構的地址作為arg的引數傳入。

按照UNIX慣例,函式成功時返回0,失敗時返回-1,但pthread_XXX系列函式並未遵循這個慣例,在pthread_XXX失敗時返回的是錯誤碼。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

void *thread_fun(void *param)
{
    for ( int i = 0; i < 10; ++i)
    {
        printf("this is thread fun %d...\n", i);
    }
}

int main()
{
    pthread_t tid;
    if ( 0 != pthread_create(&tid, NULL, thread_fun, NULL) )
    {
        printf("create thread failed!\n");
    }
    for ( int i = 0; i < 10; ++i)
    {
        printf("this is main function %d ...\n", i);
    }
}


怎麼全都是main function,難道執行緒建立失敗了?做了判斷了啊。其實這就是執行緒的第一個問題,和執行緒的排程有關,就詳細說了,簡單一句話:thread執行緒還沒開始執行main就已經執行完了,由於main退出則程序結束,程序中的所有執行緒也就結束了,如果還要看,則可以把main中的 迴圈次數加大,則會看到thread也列印了。

2、等待執行緒

由前面可以知道,thread還沒來得急執行main就退出了,那有沒有什麼辦法呢?當然有,而且很多,只要能讓main阻塞的操作都可以,比如:等待輸入,sleep,空轉迴圈等等。但這些始終感覺是小門小道,難登大雅之堂。在windows中有WaitForSingleObject來等待物件,linux中有pthread_join來等待。

函式pthread_join用來等待一個執行緒的結束。
標頭檔案 :
#include <pthread.h>
函式定義: 

int pthread_join(pthread_t thread, void **retval);

描述 :
pthread_join()函式,以阻塞的方式等待thread指定的執行緒結束。當函式返回時,被等待執行緒的資源被收回。如果執行緒已經結束,那麼該函式會立即返回。並且thread指定的執行緒必須是joinable的。

引數 :
thread: 執行緒識別符號,即執行緒ID,標識唯一執行緒。retval: 使用者定義的指標,用來儲存被等待執行緒的返回值。

返回值 : 
0代表成功。 失敗,返回的則是錯誤號。

程式碼中如果沒有pthread_join主執行緒會很快結束從而使整個程序結束,從而使建立的執行緒沒有機會開始執行就結束了。加入pthread_join後,主執行緒會一直等待直到等待的執行緒結束自己才結束,使建立的執行緒有機會執行。所有執行緒都有一個執行緒號,也就是Thread ID。其型別為pthread_t。通過呼叫pthread_self()函式可以獲得自身的執行緒號另外需要說明的是一個執行緒不能被多個執行緒等待,也就是說對一個執行緒只能呼叫一次pthread_join,否則只有一個能正確返回,其他的將返回ESRCH 錯誤。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

void *thread_fun(void *param)
{
    for ( int i = 0; i < 10; ++i)
    {
        printf("this is thread fun %d...\n", i);
    }
    const static char *p = "I'am finish, thank you call me...";
    return (void *)p;
}

int main()
{
    pthread_t tid;
    if ( 0 != pthread_create(&tid, NULL, thread_fun, NULL) )
    {
        printf("create thread failed!\n");
    }
    for ( int i = 0; i < 10; ++i)
    {
        printf("this is main function %d ...\n", i);
    }

    char *p = NULL;
    pthread_join(tid, (void **)&p);
    printf("all finish! thank you! %s\n", p);
}

呼叫了pthread_join來等待執行緒返回,為了更具客觀性,順便獲取執行緒的返回值。


由此可見,執行緒也執行完全了,可能有人會問,這兒怎麼顯示main函式執行,再是執行緒函式執行,是不是有順序問題?當然沒有,執行是隨機的,只不過這兒只迴圈10次,次數很少了,一下子就執行完了。pthread_join除了具有等待的效果外,還有資源釋放的關係,簡單說就是用預設屬性建立的執行緒如果不呼叫pthread_join,那麼它的資源就不會得到釋放,反過來說,這樣的執行緒必須有別的執行緒對它進行pthread_join,否則就是出現記憶體洩露。但是還有一些執行緒,更喜歡自己來清理退出的狀態,他們也不願意主執行緒呼叫pthread_join來等待他們。我們將這一類執行緒的屬性稱為detached。如果我們在呼叫pthread_create()函式的時候將屬性設定為NULL,則表明我們希望所建立的執行緒採用預設的屬性,也就是joinable。如果需要將屬性設定為detached,

pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
可以通過這段程式碼來構造一個detached執行緒的屬性,在呼叫pthread_create的時候傳遞這個屬性就可以了。除此之外我們可以呼叫pthread_detach對已經建立的

joinable且還沒有呼叫pthread_join的執行緒設成detached,但如果執行緒已經呼叫了join,那麼再呼叫detach是無效的,且需要注意的是你不能用pthread_join將一個

detached執行緒設成可joinable的。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

void *thread_fun(void *param)
{
    for ( int i = 0; i < 10; ++i)
    {
        printf("this is thread fun %d...\n", i);
    }
    const static char *p = "I'am finish, thank you call me...";
    return (void *)p;
}

int main()
{
    pthread_t tid;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if ( 0 != pthread_create(&tid, &attr, thread_fun, NULL) )
    {
        printf("create thread failed!\n");
    }
    for ( int i = 0; i < 10; ++i)
    {
        printf("this is main function %d ...\n", i);
    }

    char *p = NULL;
    if (0 != pthread_join(tid, (void **)&p) )
    {
        printf("join failed!\n");
    }
    printf("all finish! thank you! %s\n", p);
}



雖然呼叫pthread_join來等待執行緒結束,但由於執行緒是detached的,所以join失敗。所以一旦執行緒設成了detached,那就它就沒辦法再回到joinable狀態了。

3、取消執行緒pthread_cancel

這個函式可以向目標執行緒傳送一個終止訊號,感覺應該比較有用。目前我的理解就是一個執行緒可以向另一個執行緒傳送cancel訊號,請求退出,當然這也要看那個執行緒願不願意,所以這裡還有一個函式

int pthread_setcancelstate(int state, int *oldstate)

int pthread_cancel(pthread_t thread)

傳送終止訊號給thread執行緒,如果成功則返回0,否則為非0值。傳送成功並不意味著thread會終止。

int pthread_setcancelstate(int state, int *oldstate)設定本執行緒對Cancel訊號的反應,state有兩種值:PTHREAD_CANCEL_ENABLE(預設)和 PTHREAD_CANCEL_DISABLE,分別表示收到訊號後設為CANCLED狀態和忽略CANCEL訊號繼續執行;old_state如果不為 NULL則存入原來的Cancel狀態以便恢復。

int pthread_setcanceltype(int type, int *oldtype)

設定本執行緒取消動作的執行時機,type由兩種取值:PTHREAD_CANCEL_DEFERRED和 PTHREAD_CANCEL_ASYNCHRONOUS,僅當Cancel狀態為Enable時有效,分別表示收到訊號後繼續執行至下一個取消點再退出和立即執行取消動作(退出);oldtype如果不為NULL則存入原來的取消動作型別值。至於什麼叫“下一個取消點”,我目前還不知道。綜上,要似pthread_cancel生效,需要三步走,首先你自己要能響應cancel的訊號,所以需要用pthread_setcancelstate來設定cancel的狀態。其次你對cancel設成enable之後你還的指定什麼時候退出,是立即退出呢還是等到所謂的下一個取消點再退出。最後,無論你把狀態設定的怎麼完美,最重要的是有人給你傳送cancel訊號,也就是別的執行緒呼叫pthread_cancel。瞭解了這些,看個例子。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

void *thread_fun(void *param)
{
    if ( 0 != pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) )
    {
        printf("set cancel state failed!\n");
    }
    if ( 0 != pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL) )
    {
        printf("set cancel type failed!\n");
    }
    for ( int i = 0; i < 500; ++i)
    {
        printf("this is thread fun %d...\n", i);
    }
}

int main()
{
    pthread_t tid;
    if ( 0 != pthread_create(&tid, NULL, thread_fun, NULL) )
    {
        printf("create thread failed!\n");
    }
    for ( int i = 0; i < 5; ++i)
    {
        printf("this is main function %d ...\n", i);
    }
    if ( 0 != pthread_cancel(tid) )
    {
        printf("cancel failed!\n");
    }
    getchar();
}

由於輸出結果比較多,就不貼圖了。注意看,最後用了個getchar()來阻塞main函式,假如沒有對執行緒進行cancel處理,那麼由於main阻塞,執行緒會執行結束。但加了cancel處理之後,執行緒會立刻結束。

根據我的經驗,我覺得這樣的場景還是比較常見的。執行緒的建立取消就說到這兒,估計差不多夠用了,再說說同步問題。linux執行緒同步方法有互斥鎖、條件變數、訊號量,其實這些windows都有。

1、互斥鎖。

先來一個直白的理解。我要訪問一個公共資源,那麼可以設定一個標誌量,當有人使用的時候讓她設為1,沒人使用的時候讓它設為0。這看看起來是完美的方法(事實也是完美

的辦法),但關鍵的問題在於併發,你無法保證當你把標誌量設為1的過程中恰好也有別的執行緒在使用這個標誌量。所以關鍵的問題在於怎麼樣在我使用標誌量的時候別的執行緒

無法訪問,答案是“原子操作”。使用者無法實現原子操作,所以系統提供了。

有兩種方法建立互斥鎖,靜態方式和動態方式。

POSIX定義了一個巨集PTHREAD_MUTEX_INITIALIZER來靜態初始化互斥鎖,方法如下: pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; 在LinuxThreads實現中,pthread_mutex_t是一個結構,而PTHREAD_MUTEX_INITIALIZER則是一個結構常量。

動態方式是採用pthread_mutex_init()函式來初始化互斥鎖,API定義如下: int pthread_mutex_init(pthread_mutex_t *mutex, constpthread_mutexattr_t *mutexattr) 其中mutexattr用於指定互斥鎖屬性(見下),如果為NULL則使用預設屬性。

pthread_mutex_destroy()用於登出一個互斥鎖,API定義如下: int pthread_mutex_destroy(pthread_mutex_t *mutex) 銷燬一個互斥鎖即意味著釋放它所佔用的資源,且要求鎖當前處於開放狀態。由於在Linux中,互斥鎖並不佔用任何資源,因此LinuxThreads中的pthread_mutex_destroy()除了檢查鎖狀態以外(鎖定狀態則返回EBUSY)沒有其他動作。

互斥鎖屬性:
互斥鎖的屬性在建立鎖的時候指定,在LinuxThreads實現中僅有一個鎖型別屬性,不同的鎖型別在試圖對一個已經被鎖定的互斥鎖加鎖時表現不同。有四個值可供選擇:
PTHREAD_MUTEX_TIMED_NP,這是預設值,也就是普通鎖。當一個執行緒加鎖以後,其餘請求鎖的執行緒將形成一個等待佇列,並在解鎖後按優先順序獲得鎖。這種鎖策略保證了資源分配的公平性。
PTHREAD_MUTEX_RECURSIVE_NP,巢狀鎖,允許同一個執行緒對同一個鎖成功獲得多次,並通過多次unlock解鎖。如果是不同執行緒請求,則在加鎖執行緒解鎖時重新競爭。
PTHREAD_MUTEX_ERRORCHECK_NP,檢錯鎖,如果同一個執行緒請求同一個鎖,則返回EDEADLK,否則與PTHREAD_MUTEX_TIMED_NP型別動作相同。這樣就保證當不允許多次加鎖時不會出現最簡單情況下的死鎖。
PTHREAD_MUTEX_ADAPTIVE_NP,適應鎖,動作最簡單的鎖型別,僅等待解鎖後重新競爭。

加鎖。對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,呼叫執行緒會阻塞,直到互斥量被解鎖。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
銷燬鎖。鎖在是使用完成後,需要進行銷燬以釋放資源。
int pthread_mutex_destroy(pthread_mutex *mutex);

下面仿照孫鑫講解多執行緒時的模擬火車票訂購

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

int tickets = 0;
void *thread_fun(void *param)
{
    int index = *(int *)param;
    while (tickets < 10)
    {
        printf("%d seal ticket %d\n", index, tickets);
        tickets += 1;
    }
}

int main()
{
    pthread_t tid[4];
    for ( int i = 0; i < 4; ++i )
    {
        if ( 0 != pthread_create(&tid[i], NULL, thread_fun, &i))
        {
            printf("create thread failed!");
        }
    }
    getchar();
}


對於輸出結果,很明顯可以知道是錯的,注意在程式碼的main中並沒有用pthread_join而是在最後用了getchar()來阻塞,因為pthread_join會讓main執行緒阻塞,執行了第一條join後面的join就要就要等被阻塞。

對於上面問題的分析很簡單,4個執行緒同時訪問了一個tickets公共變數而沒有任何加鎖同步措施,接下來用mutex改進。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

int tickets = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_fun(void *param)
{
    int index = *(int *)param;
    while (true)
    {
        pthread_mutex_lock(&mutex);
        if ( tickets > 10 )
        {
            return NULL;
        }
        printf("%d seal ticket %d\n", index, tickets);
        tickets += 1;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t tid[4];
    for ( int i = 0; i < 4; ++i )
    {
        if ( 0 != pthread_create(&tid[i], NULL, thread_fun, &i))
        {
            printf("create thread failed!");
        }
    }
    getchar();
}


由結果可以知道,整個邏輯是對的,但是為什麼全是4?因為只迴圈10次,你可以把次數加大看看。你有沒有發現我建立執行緒的時候是for(int i = 0 ; i < 4; ++i)而給執行緒傳遞的引數是i,也就是說執行緒接收到的引數是在[0, 4)這個區間內,怎麼會有4???答案是這樣的,線上程中使用了int index = *(int *)param來獲取傳遞的引數,很明顯,是根據地址獲取的,而這個地址就是i的地址,那麼在呼叫pthread_create之後index並沒有被馬上賦值,而是main中的迴圈繼續執行,這個時候i已經遞增了。這就是執行緒傳遞引數的問題,我通常的做法是建立執行緒後sleep一下,讓新執行緒有足夠的時間去拷貝引數,當然,這種做法是不可取的,正確的做法應該是在傳遞引數的時候用同步機制,當然這是後話。

如果pthread_mutex_lock獲取鎖失敗,則習執行緒被阻塞,而是用pthread_mutex_trylock時會馬上返回,根據返回值判斷是否加鎖成功,如果不成功,可以列印相應資訊。

2、條件變數

看了一下條件變數,實在沒看懂。下面是別人部落格上的:

互斥鎖不同,條件變數是用來等待而不是用來上鎖的。條件變數用來自動阻塞一個執行緒,直到某特殊情況發生為止。通常條件變數和互斥鎖同時使用。條件變數分為兩部分: 條件和變數。條件本身是由互斥量保護的。執行緒在改變條件狀態前先要鎖住互斥量。條件變數使我們可以睡眠等待某種條件出現。條件變數是利用執行緒間共享的全域性變數進行同步的一種機制,主要包括兩個動作:一個執行緒等待"條件變數的條件成立"而掛起;另一個執行緒使"條件成立"(給出條件成立訊號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個執行緒自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個執行緒改變了條件,它發訊號給關聯的條件變數,喚醒一個或多個等待它的執行緒,重新獲得互斥鎖,重新評價條件。如果兩程序共享可讀寫的記憶體,條件變數可以被用來實現這兩程序間的執行緒同步。

初始化條件變數。
靜態態初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
動態初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
等待條件成立。釋放鎖,同時阻塞等待條件變數為真才行。timewait()設定等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個執行緒wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
啟用條件變數。pthread_cond_signal,pthread_cond_broadcast(啟用所有等待執行緒)
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有執行緒的阻塞
清除條件變數。無執行緒等待,否則返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);

我的疑惑在於在使用條件變數的時候為什麼還要一個互斥量,而且奇怪的是在cont_wait的時候mutex是作為引數的,但cond_signal的時候沒有mutex作為引數。“cond一旦進入wai t就會自動release mutex,當有訊號是cond 會自動獲取mutex。wait 內部操作:一進入wait 就unlock,在wait 結束前就lock”。只能這麼理解了。。。

thread1:
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex)
pthread_mutex_unlock(&mutex)

thread2
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)

這個地方就不舉例了,條件變數我還是理解的不深。

3、訊號量

在前面討論過一個程序可以給另一個程序傳送cancel訊號而讓它退出,那麼執行緒呢?執行緒能不能這樣做?

#include<semaphore.h>

函式原型
int sem_init(sem_t *sem, int pshared, unsigned int value);
說明
sem_init() 初始化一個定位在 sem 的匿名訊號量。value 引數指定訊號量的初始值。 pshared 引數指明訊號量是由程序內執行緒共享,還是由程序之間共享。如果 pshared 的值為 0,那麼訊號量將被程序內的執行緒共享,並且應該放置在這個程序的所有執行緒都可見的地址上(如全域性變數,或者堆上動態分配的變數)。
由此可以看出,訊號量不僅可以用來做執行緒間的同步,還可以做程序間的同步。

下面在介紹sem_wait和sem_post兩個操作函式

#include <semaphore.h>
int sem_wait(sem_t * sem);
int sem_post(sem_t * sem);

這兩個函式的形式完全一樣。下面是摘自百度百科的一段話:

sem_wait函式也是一個原子操作,它的作用是從訊號量的值減去一個“1”,但它永遠會先等待該訊號量為一個非零值才開始做減法。也就是說,如果你對一個值為2的訊號量呼叫sem_wait(),執行緒將會繼續執行,這訊號量的值將減到1。如果對一個值為0的訊號量呼叫sem_wait(),這個函式就 會地等待直到有其它執行緒增加了這個值使它不再是0為止。如果有兩個執行緒都在sem_wait()中等待同一個訊號量變成非零值,那麼當它被第三個執行緒增加 一個“1”時,等待執行緒中只有一個能夠對訊號量做減法並繼續執行,另一個還將處於等待狀態。

從這兒可以得到兩點:首先sem_wait是個原子操作,素所以不需要額外的同步措施,前面說的條件變數在使用時需要另外的一個mutex,我懷疑是pthread_cond_wait不是原子操作,所以才需要另外的一個mutex。另外一點sem_wait是阻塞的,如果要用非阻塞,可以用sem_trywait()執行成功返回0,執行失敗返回 -1且訊號量的值保持不變,此外還有sem_timewait()表示阻塞一定的時間。

再看看sem_post

#include <semaphore.h>
int sem_post(sem_t *sem);
sem_post函式的作用是給訊號量的值加上一個“1”,它是一個“原子操作”---即同時對同一個訊號量做加“1”操作的兩個執行緒是不會衝突的;而同時對同一個檔案進行讀、加和寫操作的兩個程式就有可能會引起衝突。訊號量的值永遠會正確地加一個“2”--因為有兩個執行緒試圖改變它。 當有執行緒阻塞在這個訊號量上時,呼叫這個函式會使其中一個執行緒不在阻塞,選擇機制是有執行緒的排程策略決定的。
sem_post() 成功時返回 0;錯誤時,訊號量的值沒有更改,-1 被返回,並設定 errno 來指明錯誤。

這也是百度百科的一段話,由此可見,sem_post也是原子操作,如果把訊號量的初始值設成1,它就退化成互斥量了。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <semaphore.h>

int tickets = 0;
sem_t sem;

void *thread_fun(void *param)
{
    int index = *(int *)param;
    while (true)
    {
        sem_wait(&sem);
        if ( tickets > 10 )
        {
            return NULL;
        }
        printf("%d seal ticket %d\n", index, tickets);
        tickets += 1;
        sem_post(&sem);
    }
}

int main()
{
    if ( 0 != sem_init(&sem, 0, 1 ) )
    {
        printf("init semsphore failed!\n");
        return 0;
    }
    pthread_t tid[4];
    for ( int i = 0; i < 4; ++i )
    {
        if ( 0 != pthread_create(&tid[i], NULL, thread_fun, &i))
        {
            printf("create thread failed!");
        }
    }
    getchar();
}