1. 程式人生 > >Linux多執行緒實踐(8) --Posix條件變數解決生產者消費者問題

Linux多執行緒實踐(8) --Posix條件變數解決生產者消費者問題

Posix條件變數

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,  pthread_mutex_t  *mutex,  const  struct timespec *abstime);

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

    與互斥鎖不同,條件變數是用來等待而不是用來上鎖的。條件變數用來自動阻塞呼叫執行緒, 直到條件變數所要求的情況發生為止。通常條件變數需要和互斥鎖同時使用, 利用互斥量保護條件變數;

  條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個執行緒自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個執行緒改變了條件,它就傳送訊號給關聯的條件變數, 並喚醒一個或多個等待在該條件變數上的執行緒,這些執行緒將重新獲得互斥鎖,重新評價條件。如果將條件變數放到共享記憶體中, 而兩程序可共享讀寫這段記憶體,則條件變數可以被用來實現兩程序間的執行緒同步。

條件變數使用規範

1.等待條件程式碼

pthread_mutex_lock(&mutex);

while (條件為假)
{
    pthread_cond_wait(&cond, &mutex);
}
修改條件

pthread_mutex_unlock(&mutex);

/**解釋: 為什麼使用while, 而不用if?

Man-Page給出了答案: If a signal is delivered to a thread waiting for a condition variable, upon return from 

the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or 

it shall return zero due to spurious wakeup.

即是說如果正在等待條件變數的一個執行緒收到一個訊號,從訊號處理函式返回的時候執行緒應該重新等待條件變數就好象沒有被中斷一樣,或者被虛假地喚醒返回0

。如果是上述情形,那麼其實條件並未被改變,那麼此時如果沒有繼續判斷一下條件的真假就繼續向下執行的話,修改條件將會出現問題,所以需要使用while 迴圈再判斷一下,如果條件還是為假必須繼續等待。

注:在多處理器系統中,pthread_cond_signal 可能會喚醒多個等待條件的執行緒,這也是一種spurious wakeup。

**/

2.給條件傳送訊號程式碼

pthread_mutex_lock(&mutex);

設定條件為真
pthread_cond_signal(&cond);

pthread_mutex_unlock(&mutex);

條件變數API說明

1.pthread_cond_init

使用條件變數之前要先進行初始化:可以在單個語句中生成和初始化一個條件變數如:

  pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER; //用於程序間執行緒的通訊;

或用函式pthread_cond_init進行動態初始化;

2.pthread_cond_destroy

該函式可以用來摧毀所指定的條件變數,同時將會釋放所給它分配的資源。呼叫該函式的程序並不要求等待在引數所指定的條件變數上;

3.pthread_cond_wait && pthread_cond_timedwait

cond_wait原語完成三件事:

(1)對mutex解鎖;

(2)等待條件, 直到有執行緒向他傳送通知;

(3)當wait返回時, 再對mutex重新加鎖;

第一個引數cond是指向一個條件變數的指標。第二個引數mutex則是對相關的互斥鎖的指標。

函式pthread_cond_timedwait函式型別與函式pthread_cond_wait區別在於:timedwait多了一個超時, 超時值制訂了我們願意等待多長時間, 如果達到或是超過所引用的引數*abstime,它將結束阻塞並返回錯誤ETIME.

//timespec結構如下:
struct timespec
{
    time_t   tv_sec;        /* seconds */
    long     tv_nsec;       /* nanoseconds */
};

注意: 這個時間值是一個絕對數而不是相對數, 例如, 假設願意等待三秒鐘, 那麼並不是把3秒鐘轉換成timespec結構, 而是需要將當前實踐加上3分鐘再轉換成timespec結構, 這個獲取當前時間值的函式可以是clock_gettime(我們採用這一個)也可以是gettimeofday.

4.pthread_cond_signal && pthread_cond_broadcast

cond_signal原語所完成的操作:

向第一個等待條件的執行緒發起通知, 如果沒有任何一個執行緒處於等待條件的狀態, 那麼這個通知將被忽略;

cond_broadcast:

向所有等待在該條件上的執行緒傳送通知;

引數cond是一個條件變數的指標。當呼叫signal時, 一個在相同條件變數上阻塞的執行緒將被解鎖。如果同時有多個執行緒阻塞,則由排程策略確定接收通知的執行緒。如果呼叫broadcast,則將通知阻塞在這個條件變數上的所有執行緒。一旦被喚醒,執行緒仍然會要求互斥鎖。如果當前沒有執行緒等待通知,則上面兩種呼叫實際上成為一個空操作, 核心會將條件變數的通知忽略(如果引數*cond指向非法地址,則返回值EINVAL);

類Condition封裝

//Condition類設計
class Condition
{
public:
    Condition(const pthread_mutexattr_t *mutexAttr = NULL,
              const pthread_condattr_t  *condAttr = NULL);
    ~Condition();

    //條件變數函式
    int signal();
    int broadcast();
    int wait();
    int timedwait(int seconds);

    //互斥量函式
    int lock();
    int trylock();
    int unlock();

private:
    pthread_mutex_t m_mutex;
    pthread_cond_t  m_cond;
};
//Condition類實現
Condition::Condition(const pthread_mutexattr_t *mutexAttr,
                     const pthread_condattr_t  *condAttr)
{
    //初始化互斥量
    pthread_mutex_init(&m_mutex, mutexAttr);
    //初始化條件變數
    pthread_cond_init(&m_cond, condAttr);
}
Condition::~Condition()
{
    //銷燬互斥量
    pthread_mutex_destroy(&m_mutex);
    //銷燬條件變數
    pthread_cond_destroy(&m_cond);
}
int Condition::signal()
{
    return pthread_cond_signal(&m_cond);
}
int Condition::broadcast()
{
    return pthread_cond_broadcast(&m_cond);
}
int Condition::wait()
{
    return pthread_cond_wait(&m_cond, &m_mutex);
}
int Condition::timedwait(int seconds)
{
    //獲取當前時間
    struct timespec abstime;
    clock_gettime(CLOCK_REALTIME, &abstime);
    //將當前時間加上需要等待的秒數, 構成絕對時間值
    abstime.tv_sec += seconds;
    return pthread_cond_timedwait(&m_cond, &m_mutex, &abstime);
}

int Condition::lock()
{
    return pthread_mutex_lock(&m_mutex);
}
int Condition::trylock()
{
    return pthread_mutex_trylock(&m_mutex);
}
int Condition::unlock()
{
    return pthread_mutex_unlock(&m_mutex);
}

生產者消費者問題(無界緩衝區)

/** 實現: 我們假設是緩衝區是無界的
說明:生產者可以不停地生產,使用pthread_cond_signal  發出通知的時候,如果此時沒有消費者執行緒在等待條件,那麼這個通知將被丟棄,但也不影響整體程式碼的執行,沒有消費者執行緒在等待,說明產品資源充足,即while 判斷失敗,不會進入等待狀態,直接消費產品(即修改條件)。
**/
const unsigned int PRODUCER_COUNT = 5;	//生產者個數
const unsigned int CONSUMER_COUNT = 3;	//消費者個數

//定義Condition類
Condition cond;
//緩衝區 ~O(∩_∩)O~
int nReady = 0;
//消費者
void *consumer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //鎖定mutex
        while (!(nReady > 0))
        {
            printf("-- thread %d wait...\n", id);
            cond.wait();    //等待條件變數
        }

        printf("** thread %d alive, and consume product %d ...\n", id, nReady);
        -- nReady;  //消費
        printf("   thread %d end consume... \n\n", id);

        cond.unlock();  //解鎖mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

//生產者
void *producer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //鎖定mutex

        printf("++ thread %d signal, and produce product %d ...\n", id, nReady+1);
        ++ nReady;      //生產
        cond.signal();  //傳送條件變數訊號
        printf("   thread %d end produce, signal...\n\n", id);
        cond.unlock();  //解鎖mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

int main()
{
    pthread_t thread[PRODUCER_COUNT+CONSUMER_COUNT];

    //首先生成消費者
    for (unsigned int i = 0; i < CONSUMER_COUNT; ++i)
        pthread_create(&thread[i], NULL, consumer, new int(i));
    sleep(1);   //使生產者等待一段時間, 加速消費者等待事件產生
    //然後生成生產者
    for (unsigned int i = 0; i < PRODUCER_COUNT; ++i)
        pthread_create(&thread[CONSUMER_COUNT+i], NULL, producer, new int(i));
    for (unsigned int i = 0; i < PRODUCER_COUNT+CONSUMER_COUNT; ++i)
        pthread_join(thread[i], NULL);
}