1. 程式人生 > >【Linux C 多執行緒程式設計】互斥鎖與條件變數

【Linux C 多執行緒程式設計】互斥鎖與條件變數

一、互斥鎖

互斥量從本質上說就是一把鎖, 提供對共享資源的保護訪問。

  1. 初始化:

  在Linux下, 執行緒的互斥量資料型別是pthread_mutex_t. 在使用前, 要對它進行初始化:

  對於靜態分配的互斥量, 可以把它設定為PTHREAD_MUTEX_INITIALIZER, 或者呼叫pthread_mutex_init.

  對於動態分配的互斥量, 在申請記憶體(malloc)之後, 通過pthread_mutex_init進行初始化, 並且在釋放記憶體(free)前需要呼叫pthread_mutex_destroy.

  原型:

  int pthread_mutex_init(pthread_mutex_t *restrict

mutex, const pthread_mutexattr_t *restric attr);

  int pthread_mutex_destroy(pthread_mutex_t *mutex);

  標頭檔案:

  返回值: 成功則返回0, 出錯則返回錯誤編號.

  說明: 如果使用預設的屬性初始化互斥量, 只需把attr設為NULL. 其他值在以後講解。

  2. 互斥操作:

  對共享資源的訪問, 要對互斥量進行加鎖, 如果互斥量已經上了鎖, 呼叫執行緒會阻塞, 直到互斥量被解鎖. 在完成了對共享資源的訪問後, 要對互斥量進行解鎖。

  首先說一下加鎖函式:

  標頭檔案:

  原型:

  int pthread_mutex_lock(pthread_mutex_t *

mutex);

  int pthread_mutex_trylock(pthread_mutex_t *mutex);

  返回值: 成功則返回0, 出錯則返回錯誤編號.

  說明: 具體說一下trylock函式, 這個函式是非阻塞呼叫模式, 也就是說, 如果互斥量沒被鎖住, trylock函式將把互斥量加鎖, 並獲得對共享資源的訪問許可權; 如果互斥量被鎖住了, trylock函式將不會阻塞等待而直接返回EBUSY, 表示共享資源處於忙狀態。

  再說一下解所函式:

  標頭檔案:

  原型: int pthread_mutex_unlock(pthread_mutex_t *mutex);

  返回值: 成功則返回0, 出錯則返回錯誤編號.

  3. 死鎖:

  死鎖主要發生在有多個依賴鎖存在時, 會在一個執行緒試圖以與另一個執行緒相反順序鎖住互斥量時發生. 如何避免死鎖是使用互斥量應該格外注意的東西。

  總體來講, 有幾個不成文的基本原則:

  對共享資源操作前一定要獲得鎖。

  完成操作以後一定要釋放鎖。

  儘量短時間地佔用鎖。

  如果有多鎖, 如獲得順序是ABC連環扣, 釋放順序也應該是ABC。

  執行緒錯誤返回時應該釋放它所獲得的鎖。

下面給個測試小程式進一步瞭解互斥,mutex互斥訊號量鎖住的不是一個變數,而是阻塞住一段程式。如果對一個mutex變數testlock, 執行了第一次pthread_mutex_lock(testlock)之後,在unlock(testlock)之前的這段時間內,如果有其他執行緒也執行到了pthread_mutex_lock(testlock),這個執行緒就會阻塞住,直到之前的執行緒unlock之後才能執行,由此,實現同步,也就達到保護臨界區資源的目的。
#include<stdio.h>
#include<pthread.h>

static pthread_mutex_t testlock;
pthread_t test_thread;

void *test()
{
pthread_mutex_lock(&testlock);
printf("thread Test() \n");
pthread_mutex_unlock(&testlock);
}

int main()
{
pthread_mutex_init(&testlock, NULL);
pthread_mutex_lock(&testlock);

printf("Main lock \n");

pthread_create(&test_thread, NULL, test, NULL);
sleep(1); //更加明顯的觀察到是否執行了建立執行緒的互斥鎖
printf("Main unlock \n");
pthread_mutex_unlock(&testlock);

sleep(1);

pthread_join(test_thread,NULL);
pthread_mutex_destroy(&testlock);
return 0;
}

make
gcc -D_REENTRANT -lpthread -o test test.c

結果:
Main lock
Main unlock
thread Test()

二、條件變數
這裡主要說說 pthread_cond_wait()的用法,在下面有說明。
條件變數是利用執行緒間共享的全域性變數進行同步的一種機制,主要包括兩個動作:個執行緒等待"條件變數的條件成立"而掛起;另一個執行緒使"條件成立"(給出條件成立訊號)。為了防止競爭,條件變數的使用總是和一個互斥鎖結合在一起。  
   
1.   建立和登出  

條件變數和互斥鎖一樣,都有靜態動態兩種建立方式,靜態方式使用PTHREAD_COND_INITIALIZER常量,如下:    
pthread_cond_t   cond=PTHREAD_COND_INITIALIZER    

動態方式呼叫pthread_cond_init()函式,API定義如下:    
int   pthread_cond_init(pthread_cond_t   *cond,   pthread_condattr_t   *cond_attr)    
   
儘管POSIX標準中為條件變數定義了屬性,但在LinuxThreads中沒有實現,因此cond_attr值通常為NULL,且被忽略。  

登出一個條件變數需要呼叫pthread_cond_destroy(),只有在沒有執行緒在該條件變數上等待的時候才能登出這個條件變數,否則返回EBUSY。因為Linux實現的條件變數沒有分配什麼資源,所以登出動作只包括檢查是否有等待執行緒。API定義如下:    
int   pthread_cond_destroy(pthread_cond_t   *cond)    

2.   等待和激發  

int   pthread_cond_wait(pthread_cond_t   *cond,   pthread_mutex_t   *mutex)  
int   pthread_cond_timedwait(pthread_cond_t   *cond,   pthread_mutex_t   *mutex,   const   struct   timespec   *abstime)    



等待條件有兩種方式:無條件等待pthread_cond_wait()和計時等待pthread_cond_timedwait(),其中計時等待方式如果在給定時刻前條件沒有滿足,則返回ETIMEOUT,結束等待,其中abstime以與time()系統呼叫相同意義的絕對時間形式出現,0表示格林尼治時間1970年1月1日0時0分0秒。  

無論哪種等待方式,都必須和一個互斥鎖配合,以防止多個執行緒同時請求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的競爭條件(Race   Condition)。mutex互斥鎖必須是普通鎖(PTHREAD_MUTEX_TIMED_NP)或者適應鎖(PTHREAD_MUTEX_ADAPTIVE_NP),且在呼叫pthread_cond_wait()前必須由本執行緒加鎖(pthread_mutex_lock()),而在更新條件等待佇列以前,mutex保持鎖定狀態,並在執行緒掛起進入等待前解鎖。在條件滿足從而離開pthread_cond_wait()之前,mutex將被重新加鎖,以與進入pthread_cond_wait()前的加鎖動作對應。   執行pthread_cond_wait()時自動解鎖互斥量(如同執行了 pthread_unlock_mutex),並等待條件變數觸發。這時執行緒掛起,不佔用 CPU 時間,直到條件變數被觸發。

因此,全過程可以描述為:

(1)pthread_mutex_lock()上鎖,

(2)pthread_cond_wait()等待,等待過程分解為為:解鎖--條件滿足--加鎖

(3)pthread_mutex_unlock()解鎖。  
激發條件有兩種形式,pthread_cond_signal()啟用一個等待該條件的執行緒,存在多個等待執行緒時按入隊順序啟用其中一個;而pthread_cond_broadcast()則啟用所有等待執行緒。 兩者 如果沒有等待的執行緒,則什麼也不做。

下面一位童鞋問的問題解釋了上面的說明:

當pthread_cond_t呼叫pthread_cond_wait進入等待狀態時,pthread_mutex_t互斥訊號無效了.

示例程式碼如下:

//多執行緒同步--條件鎖(相當與windows的事件)測試

//要先讓pthread_cond_wait進入等待訊號狀態,才能呼叫pthread_cond_signal傳送訊號,才有效.

//不能讓pthread_cond_signal在pthread_cond_wait前面執行

#include <stdio.h>

#include<pthread.h> //多執行緒所用標頭檔案

#include <semaphore.h> //訊號量使用標頭檔案

pthread_cond_t g_cond /*=PTHREAD_MUTEX_INITIALIZER*/; //申明條鎖,並用巨集進行初始化

pthread_mutex_t g_mutex ;

//執行緒執行函式

void threadFun1(void)

{

int i;

pthread_mutex_lock(&g_mutex); //1

pthread_cond_wait(&g_cond,&g_mutex); //如g_cond無訊號,則阻塞

for( i = 0;i < 2; i++ ){

printf("thread threadFun1.\n");

sleep(1);

}

pthread_cond_signal(&g_cond);

pthread_mutex_unlock(&g_mutex);

}

int main(void)

{

pthread_t id1; //執行緒的識別符號

pthread_t id2;

pthread_cond_init(&g_cond,NULL); //也可以程式裡面初始化

pthread_mutex_init(&g_mutex,NULL); //互斥變數初始化

int i,ret;

ret = pthread_create(&id1,NULL,(void *)threadFun1, NULL);

if ( ret!=0 ) { //不為0說明執行緒建立失敗

printf ("Create pthread1 error!\n");

exit (1);

}

sleep(5); //等待子執行緒先開始

pthread_mutex_lock(&g_mutex); //2

pthread_cond_signal(&g_cond); //給個開始訊號,注意這裡要先等子執行緒進入等待狀態在發訊號,否則無效

pthread_mutex_unlock(&g_mutex);

pthread_join(id1,NULL);

pthread_cond_destroy(&g_cond); //釋放

pthread_mutex_destroy(&g_mutex); //釋放

return 0;

}

大家請看紅顏色的1和2.

明明是1先鎖了互斥變數,但程式碼執行到2還是一樣可以鎖定.

為什麼會這樣呢????/

pthread_cond_wait()什麼情況才會接鎖,繼續跑下去啊...現在來看一段典型的應用:看註釋即可。

問題解釋:當程式進入pthread_cond_wait等待後,將會把g_mutex進行解鎖,當離開pthread_cond_wait之前,g_mutex會重新加鎖。所以在main中的g_mutex會被加鎖。 呵呵。。。

現在來看一段典型的應用:看註釋即可。

#include <pthread.h>   
#include <unistd.h>   

static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;   
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;   

struct node {   
int n_number;   
struct node *n_next;   
} *head = NULL;   

/*[thread_func]*/   
static void cleanup_handler(void *arg)   
{   
printf("Cleanup handler of second thread.\n");   
free(arg);   
(void)pthread_mutex_unlock(&mtx);   
}   

static void *thread_func(void *arg)   
{   
struct node *p = NULL;   

pthread_cleanup_push(cleanup_handler, p);   
while (1) {   
pthread_mutex_lock(&mtx);           //這個mutex主要是用來保證pthread_cond_wait的併發性  
while (head == NULL)   {               //這個while要特別說明一下,單個pthread_cond_wait功能很完善,為何這裡要有一個while (head == NULL)呢?因為pthread_cond_wait裡的執行緒可能會被意外喚醒,如果這個時候head != NULL,則不是我們想要的情況。這個時候,應該讓執行緒繼續進入pthread_cond_wait  
pthread_cond_wait(&cond, &mtx);         // pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx,然後阻塞在等待對列裡休眠,直到再次被喚醒(大多數情況下是等待的條件成立而被喚醒,喚醒後,該程序會先鎖定先pthread_mutex_lock(&mtx);,再讀取資源, 用這個流程是比較清楚的/*block-->unlock-->wait() return-->lock*/
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/errno.h>
#include <sys/types.h>
#include <signal.h>
#include <pthread.h>

#define    min(a,b)    ((a) < (b) ? (a) : (b))
#define    max(a,b)    ((a) > (b) ? (a) : (b))

#define    MAXNITEMS         1000000
#define    MAXNTHREADS            100

int        nitems;            /* read-only by producer and consumer */
struct {
pthread_mutex_t    mutex;
int    buff[MAXNITEMS];
int    nput;
int    nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };

void    *produce(void *), *consume(void *);

/* include main */
int
main(int argc, char **argv)
{
int            i, nthreads, count[MAXNTHREADS];
pthread_t    tid_produce[MAXNTHREADS], tid_consume;

if (argc != 3) {
printf("usage: prodcons3 <#items> <#threads>\n");
return -1;
}

nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);

/* 4create all producers and one consumer */

for (i = 0; i < nthreads; i++) {
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);

/* 4wait for all producers and the consumer */
for (i = 0; i < nthreads; i++) {
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);    
}
pthread_join(tid_consume, NULL);

exit(0);
}
/* end main */

void *
produce(void *arg)
{
for ( ; ; ) {
pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
pthread_mutex_unlock(&shared.mutex);
return(NULL);        /* array is full, we're done */
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
pthread_mutex_unlock(&shared.mutex);
*((int *) arg) += 1;
}
}

/* include consume */
void
consume_wait(int i)
{
for ( ; ; ) {
pthread_mutex_lock(&shared.mutex);
if (i < shared.nput) {
pthread_mutex_unlock(&shared.mutex);
return;            /* an item is ready */
}
pthread_mutex_unlock(&shared.mutex);
}
}

void *
consume(void *arg)
{
int        i;

for (i = 0; i < nitems; i++) {
consume_wait(i);
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return(NULL);
}