轉載請註明出處:http://blog.csdn.net/suool/article/details/38582521.

基本概念與原理

互斥鎖能夠解決資源的互斥訪問,但是在某些情況下,互斥並不能解決問題,比如兩個執行緒需 要互斥的處理各自的操作,但是一個執行緒的操作僅僅存在一種條件成立的情況下執行,一旦錯過不可再重現,由於執行緒間相互爭奪cpu資源,因此在條件成立的時候,該執行緒不一定爭奪到cpu而錯過,導致永遠得不到執行.....

因此需要某個機制來解決此問題,更重要的是,執行緒僅僅只有一種情況需要執行操作,不停地申請資源導致資源浪費,而Linux提供了條件變數加互斥鎖解決該問題.

條件變數變數也是出自POSIX執行緒標準,另一種執行緒同步機制,。主要用來等待某個條件的發生。可以用來同步同一程序中的各個執行緒。當然如果一個條件變數存放在多個程序共享的某個記憶體區中,那麼還可以通過條件變數來進行程序間的同步。

每個條件變數總是和一個互斥量相關聯,條件本身是由互斥量保護的,執行緒在改變條件狀態之間必須要鎖住互斥量。條件變數相對於互斥量最大的優點在於允許執行緒以無競爭的方式等待條件的發生。當一個執行緒獲得互斥鎖後,發現自己需要等待某個條件變為真,如果是這樣,該執行緒就可以等待在某個條件上,這樣就不需要通過輪詢的方式來判斷新增,大大節省了CPU時間。

在互斥量一文中說過互斥量是用於上鎖,而不是用於等待現在這句話可以加強為:互斥量是用於上鎖,條件變數用於等待

條件變數宣告為pthread_cond_t資料型別,在<bits/pthreadtypes.h>中有具體的定義。

基本操作

條件變數初始化和銷燬

/* Initialize condition variable  */
int pthread_cond_init (pthread_cond_t *__restrict __cond,   // 指向要初始化的變數的指標
                              __const pthread_condattr_t *__restrict __cond_attr) ;// 屬性

/* Destroy condition variable */
int pt<span style="font-size:12px;">hread_cond_destroy (pthread_cond_t *__cond);</span>
上面兩個函式分別由於條件變數的初始化和銷燬。

和互斥量的初始化一樣,如果條件變數是靜態分配的,可以通過常量進行初始化,如下:

<span style="font-size:12px;">pthread_cond_t mlock = PTHREAD_COND_INITIALIZER;</span>
也可以通過pthread_cond_init()進行初始化,對於動態分配的條件變數由於不能直接賦值進行初始化,就只能採用這種方式進行初始化。那麼當不在需要使用條件變數時,需要呼叫pthread_cond_destroy()銷燬該條件所佔用的資源。

條件變數的屬性設定

/* 初始化條件變數屬性物件  */
int pthread_condattr_init (pthread_condattr_t *__attr);

/* 銷燬條件變數屬性物件  */
int pthread_condattr_destroy (pthread_condattr_t *__attr);

/* 獲取條件變數屬性物件在程序間共享與否的標識  */
int pthread_condattr_getpshared (__const pthread_condattr_t * __restrict __attr,
                                        int *__restrict __pshared);

/* 設定條件變數屬性物件,標識在程序間共享與否 */
int pthread_condattr_setpshared (pthread_condattr_t *__attr, int __pshared) ;

這個屬性的設定和互斥量屬性設定是一樣的,具體使用可以參考互斥量的用法:互斥量的屬性設定

條件變數的申請使用

/<span style="font-family:SimSun;font-size:12px;">*  等待條件變為真 */
 int pthread_cond_wait (pthread_cond_t *__restrict __cond,
                              pthread_mutex_t *__restrict __mutex);

 /* 限時等待條件為真 */
 int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
                                   pthread_mutex_t *__restrict __mutex,
                                   __const struct timespec *__restrict __abstime);

 /* 喚醒一個等待條件的執行緒.  */
 int pthread_cond_signal (pthread_cond_t *__cond);

 /* 喚醒等待該條件的所有執行緒 */
 int pthread_cond_broadcast (pthread_cond_t *__cond);</span>
(1)pthread_cond_wait()函式用於等待條件被觸發。該函式傳入兩個引數,一個條件變數一個互斥量,函式將條件變數和互斥量進行關聯,互斥量對該條件進行保護,傳入的互斥量必須是已經鎖住的。呼叫pthread_cond_wait()函式後,會原子的執行以下兩個動作:
  • 將呼叫執行緒放到等待條件的執行緒列表上,即進入睡眠;
  • 對互斥量進行解鎖;

由於這兩個操作時原子操作,這樣就關閉了條件檢查和執行緒進入睡眠等待條件改變這兩個操作之間的時間通道,這樣就不會錯過任何條件的變化。

當pthread_cond_wait()返回後,互斥量會再次被鎖住。

(2)pthread_cond_timedwait()函式和pthread_cond_wait()的工作方式相似,只是多了一個等待時間。等待時間的結構為struct timespec,

<span style="font-size:12px;">    struct timespec{  
    time_t  tv_sec    //Seconds.  
    long    tv_nsec   //Nanoseconds.  
    };  </span>

函式要求傳入的時間值是一個絕對值,不是相對值,例如,想要等待3分鐘,必須先獲得當前時間,然後加上3分鐘。

要想獲得當前系統時間的timespec值,沒有直接可呼叫的函式,需要通過呼叫gettimeofday函式獲取timeval結構,然後轉換成timespec結構,轉換公式就是:

<span style="font-size:12px;">timeSpec.tv_sec = timeVal.tv_sec;
timeSpec.tv_nsec = timeVal.tv_usec * 1000;</span>

所以要等待3分鐘,timespec時間結構的獲得應該如下所示:
<span style="font-size:12px;">    struct timeval now;  
    struct timespec until;  
    gettimeofday(&now);//獲得系統當前時間  
      
    //把時間從timeval結構轉換成timespec結構  
    until.tv_sec = now.tv_sec;  
    until.tv_nsec = now.tv_usec * 1000;  
      
    //增加min  
    until.tv_sec += 3 * 60;  </span>

如果時間到後,條件還沒有發生,那麼會返回ETIMEDOUT錯誤。

從pthread_cond_wait()和pthread_cond_timewait()成功返回時,執行緒需要重新計算條件,因為其他執行緒可能在執行過程中已經改變條件。

(3)pthread_cond_signal() & pthread_cond_broadcast()

這兩個函式都是用於向等待條件的執行緒傳送喚醒訊號,pthread_cond_signal()函式只會喚醒等待該條件的某個執行緒,pthread_cond_broadcast()會廣播條件狀態的改變,以喚醒等待該條件的所有執行緒。例如多個執行緒只讀共享資源,這是可以將它們都喚醒。

這裡要注意的是:一定要在改變條件狀態後,再給執行緒傳送訊號。

考慮條件變數訊號單播發送和廣播發送的一種候選方式是堅持使用廣播發送。只有在等待者程式碼編寫確切,只有一個等待者需要喚醒,且喚醒哪個執行緒無所謂,那麼此時為這種情況使用單播,所以其他情況下都必須使用廣播發送。

使用示例

下面的程式使用條件變數和互斥鎖實現生產者消費者問題.整個臨時儲存空間為2.因此,如果臨時空間已滿,則阻塞生產程序,若果沒有產品則阻塞消費者執行緒.

應用程式申請了三個物件:

1)一個互斥鎖物件,配合條件變數使用

2)一個空間非空條件變數,消費者執行緒在空間沒有產品時等待這一條件變數;生產者在生成產品後通知此變數

3)一個空間非滿條件變數,生成執行緒在空間滿時等待著這一條件變數;消費者執行緒在消費產品後通知此變數

生產線流程如下:

1.申請互斥鎖,如果互斥鎖鎖定,阻塞等待

2.測試空間是否已滿

3.如條件滿足(非滿),執行操作,完成後解鎖互斥鎖

4.如果第二步不滿足,阻塞,等待非滿的條件變數

消費者執行緒如下:

1.申請互斥鎖,如果已鎖定,阻塞等待

2.測試空間是否為空

3.滿足非空,則執行操作,完成後解鎖

4.若第二步不滿足,阻塞當前程序,等待空間非空變數.

原始碼如下:

/*************************************************************************
> File Name: pthread_cond_exp.c
> Author:SuooL 
> Mail:[email protected] || [email protected]
> Website:http://blog.csdn.net/suool | http://suool.net
> Created Time: 2014年08月15日 星期五 13時27分13秒
> Description: 
************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>

#define BUFFER_SIZE 2

/* Circular buffer of integers. */
struct prodcons 
{
    int buffer[BUFFER_SIZE];      /* the actual data */
    pthread_mutex_t lock;         /* mutex ensuring exclusive access to buffer */
    int readpos, writepos;        /* positions for reading and writing */
    pthread_cond_t notempty;      /* signaled when buffer is not empty */
    pthread_cond_t notfull;       /* signaled when buffer is not full */
};

/* Initialize a buffer */
void init(struct prodcons *prod)
{
    pthread_mutex_init(&prod->lock,NULL);     // 初始化互斥鎖
    pthread_cond_init(&prod->notempty,NULL);  // 初始化條件變數
    pthread_cond_init(&prod->notfull,NULL);   // ....
    prod->readpos = 0;
    prod->writepos = 0;                        // 初始化讀寫操作位置
}
/* Store an integer in the buffer */
void put(struct prodcons * prod, int data)   // 輸入產品子函式
{
    pthread_mutex_lock(&prod->lock);       // 鎖定互斥鎖
    /* Wait until buffer is not full */
    while ((prod->writepos + 1) % BUFFER_SIZE == prod->readpos)  // 測試空間是否滿
    {
        printf("producer wait for not full\n");
        pthread_cond_wait(&prod->notfull, &prod->lock);  // 等待空間有空
    }
    /* Write the data and advance write pointer */
    prod->buffer[prod->writepos] = data;     // 寫資料
    prod->writepos++;                        // 寫位置加一
    if (prod->writepos >= BUFFER_SIZE)       // 如寫到尾部,返回
    prod->writepos = 0;
    /*Signal that the buffer is now not empty */
    pthread_cond_signal(&prod->notempty); // 傳送有資料訊號
    pthread_mutex_unlock(&prod->lock);          // 解鎖
}
/* Read and remove an integer from the buffer */
int get(struct prodcons *prod)
{
    int data;
    pthread_mutex_lock(&prod->lock);       // 鎖定
    /* Wait until buffer is not empty */
    while (prod->writepos == prod->readpos)           // 測試是否有資料
    {
        printf("consumer wait for not empty\n");
        pthread_cond_wait(&prod->notempty, &prod->lock); // 空則等待
    }
    /* Read the data and advance read pointer */
    data = prod->buffer[prod->readpos];     // 讀資料
    prod->readpos++;                        // 讀位置加一
    if (prod->readpos >= BUFFER_SIZE) 
    prod->readpos = 0;          // 讀到尾部,返回
    /* Signal that the buffer is now not full */
    pthread_cond_signal(&prod->notfull);  // 通知非滿
    pthread_mutex_unlock(&prod->lock);      // 解鎖
    return data;
}

#define OVER (-1)
struct prodcons buffer;
/*-------------------------------生產者-----------------------*/
void * producer(void * data)
{
    int n;
    for (n = 0; n < 5; n++)         //生產前五個產品 
    {
        printf("producer sleep 1 second......\n");
        sleep(1);      // 每秒產一個
        printf("put the %d product\n", n);
        put(&buffer, n);
    }
    for(n=5; n<10; n++)          // 生產後五個
    {
        printf("producer sleep 3 second......\n");
        sleep(3);              // 每三秒產三個
        printf("put the %d product\n",n);
        put(&buffer,n);
    }
    put(&buffer, OVER);
    printf("producer stopped!\n");
    return NULL;
}
/*--------------------------消費者----------------------------*/
void * consumer(void * data)
{
    int d=0;
    while (1) 
    {
        printf("consumer sleep 2 second......\n");
        sleep(2);             // 每2秒消費一個
        d=get(&buffer);
        printf("get the %d product\n", d);
        //		d = get(&buffer);
        if (d == OVER ) break;
    }
    printf("consumer stopped!\n");
    return NULL;
}
/*----------------------------生產者--------------------------*/
int main(int argc,char *argv[])
{
    pthread_t th_a, th_b;       // 執行緒定義
    void * retval;
    init(&buffer);
    pthread_create(&th_a, NULL, producer, 0);     // 建立生產執行緒
    pthread_create(&th_b, NULL, consumer, 0);       //  消費執行緒
    /* Wait until producer and consumer finish. */
    pthread_join(th_a, &retval);
    pthread_join(th_b, &retval);              // 等待執行緒結束
    return 0;
}

結果如下:


.