1. 程式人生 > >Linux下面的執行緒鎖,條件變數以及訊號量的使用

Linux下面的執行緒鎖,條件變數以及訊號量的使用

一) 執行緒鎖
1) 只能用於"鎖"住臨界程式碼區域
2) 一個執行緒加的鎖必須由該執行緒解鎖.

鎖幾乎是我們學習同步時最開始接觸到的一個策略,也是最簡單, 最直白的策略.

二) 條件變數,與鎖不同, 條件變數用於等待某個條件被觸發
1) 大體使用的偽碼:

// 執行緒一程式碼
pthread_mutex_lock(&mutex);
// 設定條件為true
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);

// 執行緒二程式碼
pthread_mutex_lock(&mutex);
while (條件為false)
    pthread_cond_wait(&cond, &mutex);
修改該條件
pthread_mutex_unlock(&mutex);

需要注意幾點:
1) 第二段程式碼之所以在pthread_cond_wait外面包含一個while迴圈不停測試條件是否成立的原因是, 在pthread_cond_wait被喚醒的時候可能該條件已經不成立.UNPV2對這個的描述是:"Notice that when pthread_cond_wait returns, we always test the condition again, because spurious wakeups can occur: a wakeup when the desired condition is still not true.".

2) pthread_cond_wait呼叫必須和某一個mutex一起呼叫, 這個mutex是在外部進行加鎖的mutex, 在呼叫pthread_cond_wait時, 內部的實現將首先將這個mutex解鎖, 然後等待條件變數被喚醒, 如果沒有被喚醒, 該執行緒將一直休眠, 也就是說, 該執行緒將一直阻塞在這個pthread_cond_wait呼叫中, 而當此執行緒被喚醒時, 將自動將這個mutex加鎖.
man文件中對這部分的說明是:
pthread_cond_wait atomically unlocks the mutex (as per pthread_unlock_mutex) and waits for the condition variable cond to  be  signaled.  The thread execution is suspended and does not consume any CPU time until the condition variable is
signaled. The mutex must be locked by the calling thread on entrance to pthread_cond_wait.  Before  returning  to  the calling thread, pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
也就是說pthread_cond_wait實際上可以看作是以下幾個動作的合體:
解鎖執行緒鎖
等待條件為true
加鎖執行緒鎖.

這裡是使用條件變數的經典例子:
http://www.cppblog.com/CppExplore/archive/2008/03/20/44949.html
之所以使用兩個條件變數, 是因為有兩種情況需要進行保護,使用陣列實現迴圈佇列,因此一個條件是在getq函式中判斷讀寫指標相同且可讀資料計數為0,此時佇列為空沒有資料可讀,因此獲取新資料的條件變數就一直等待,另一個條件是讀寫指標相同且可讀資料計數大於0,此時佇列滿了不能再新增資料, 因此新增新資料的條件變數就一直等待,而nEmptyThreadNum和nFullThreadNum則是計數, 只有這個計數大於0時才會喚醒相應的條件變數,這樣可以減少呼叫pthread_cond_signal的次數.
為了在下面的敘述方便, 我將這段程式碼整理在下面, 是一個可以編譯執行的程式碼,但是注意需要在編譯時加上-pthread連結執行緒庫:
#include 
<pthread.h>
#include 
<stdio.h>
#include 
<unistd.h>
#include 
<stdlib.h>class CThreadQueue
{
public:
    CThreadQueue(
int queueSize=1024):
        sizeQueue(queueSize),lput(
0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
    {
        pthread_mutex_init(
&mux,0);
        pthread_cond_init(
&condGet,0);
        pthread_cond_init(
&condPut,0);
        buffer
=newvoid*[sizeQueue];
    }
    
virtual~CThreadQueue()
    {
        delete[] buffer;
    }
    
void* getq()
    {
        
void*data;
        pthread_mutex_lock(
&mux);
        /*
此 處迴圈判斷的原因如下:假設2個執行緒在getq阻塞,然後兩者都被啟用,而其中一個執行緒執行比較塊,快速消耗了2個數據,另一個執行緒醒來的時候已經沒有新 資料可以消耗了。另一點,man pthread_cond_wait可以看到,該函式可以被訊號中斷返回,此時返回EINTR。為避免以上任何一點,都必須醒來後再次判斷睡眠條件。更 正:pthread_cond_wait是訊號安全的系統呼叫,不會被訊號中斷。
        */
while(lget==lput&&nData==0)
        {
            nEmptyThread
++;
            pthread_cond_wait(
&condGet,&mux);
            nEmptyThread
--;     
        }

        data
=buffer[lget++];
        nData
--;
        
if(lget==sizeQueue)
        {
            lget
=0;
        }
        
if(nFullThread) //必要時才進行signal操作,勿總是signal        {
            pthread_cond_signal(
&condPut);    
        }
        pthread_mutex_unlock(
&mux);
        
return data;
    }
    
void putq(void*data)
    {
        pthread_mutex_lock(
&mux);
        
while(lput==lget&&nData)
        { 
            nFullThread
++;
            pthread_cond_wait(
&condPut,&mux);
            nFullThread
--;
        }
        buffer[lput
++]=data;
        nData
++;
        
if(lput==sizeQueue)
        {
            lput
=0;
        }
        
if(nEmptyThread)//必要時才進行signal操作,勿總是signal
        {
            pthread_cond_signal(
&condGet);
        }
        pthread_mutex_unlock(
&mux);
    }
private:
    pthread_mutex_t mux;
    pthread_cond_t condGet;
    pthread_cond_t condPut;

    
void** buffer;    //迴圈訊息佇列int sizeQueue;        //佇列大小int lput;        //location put  放資料的指標偏移int lget;        //location get  取資料的指標偏移int nFullThread;    //佇列滿,阻塞在putq處的執行緒數int nEmptyThread;    //佇列空,阻塞在getq處的執行緒數int nData;        //佇列中的訊息個數,主要用來判斷佇列空還是滿};

CThreadQueue queue;
//使用的時候給出稍大的CThreadQueue初始化引數,可以減少進入核心態的操作。void* produce(void* arg)
{
    
int i=0;
    pthread_detach(pthread_self());
    
while(i++<100)
    {
        queue.putq((
void*)i);
    }
}

void*consume(void*arg)
{
    
int data;
    
while(1)
    {
        data
=(int)(queue.getq());
        printf(
"data=%d\n",data);
    }
}

int main()
{    
    pthread_t pid;
    
int i=0;

    
while(i++<3)
        pthread_create(
&pid,0,produce,0);
    i
=0;
    
while(i++<3)
        pthread_create(
&pid,0,consume,0);
    sleep(
5);

    
return0;
}


三) 訊號量
訊號量既可以作為二值計數器(即0,1),也可以作為資源計數器.
主要是兩個函式:
sem_wait()  decrements  (locks)  the semaphore pointed to by sem.  If the semaphore's value is greater than zero, then
the decrement proceeds, and the function returns, immediately.  If the semaphore currently has the  value  zero,  then
the  call  blocks  until  either  it  becomes possible to perform the decrement (i.e., the semaphore value rises above
zero), or a signal handler interrupts the call.

sem_post()  increments  (unlocks)  the  semaphore  pointed  to  by sem.  If the semaphore's value consequently becomes
greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed  to  lock
the semaphore.

而函式int sem_getvalue(sem_t *sem, int *sval);則用於獲取訊號量當前的計數.

可以用訊號量模擬鎖和條件變數:
1) 鎖,在同一個執行緒內同時對某個訊號量先呼叫sem_wait再呼叫sem_post, 兩個函式呼叫其中的區域就是所要保護的臨界區程式碼了,這個時候其實訊號量是作為二值計數器來使用的.不過在此之前要初始化該訊號量計數為1,見下面例子中的程式碼.
2) 條件變數,在某個執行緒中呼叫sem_wait, 而在另一個執行緒中呼叫sem_post.

我們將上面例子中的執行緒鎖和條件變數都改成用訊號量實現以說明訊號量如何模擬兩者:
#include <pthread.h>
#include 
<stdio.h>
#include 
<unistd.h>
#include 
<stdlib.h>
#include 
<fcntl.h>
#include 
<sys/stat.h>
#include 
<semaphore.h>
#include 
<errno.h>
#include 
<string.h>class CThreadQueue
{
public:
    CThreadQueue(
int queueSize=1024):
        sizeQueue(queueSize),lput(
0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
    {
        
//pthread_mutex_init(&mux,0);        mux = sem_open("mutex", O_RDWR | O_CREAT);
        
get= sem_open("get", O_RDWR | O_CREAT);
        put 
= sem_open("put", O_RDWR | O_CREAT);
    
        sem_init(mux, 
01);

        buffer
=newvoid*[sizeQueue];
    }
    
virtual~CThreadQueue()
    {
        delete[] buffer;
        sem_unlink(
"mutex");
        sem_unlink(
"get");
        sem_unlink(
"put");
    }
    
void* getq()
    {
        
void*data;

        
//pthread_mutex_lock(&mux);        sem_wait(mux);

        
while(lget==lput&&nData==0)
        {
            nEmptyThread
++;
            
//pthread_cond_wait(&condGet,&mux);            sem_wait(get);
            nEmptyThread
--;     
        }

        data
=buffer[lget++];
        nData
--;
        
if(lget==sizeQueue)
        {
            lget
=0;
        }
        
if(nFullThread) //必要時才進行signal操作,勿總是signal        {
            
//pthread_cond_signal(&condPut);                sem_post(put);
        }

        
//pthread_mutex_unlock(&mux);        sem_post(mux);

        
return data;
    }
    
void putq(void*data)
    {
        
//pthread_mutex_lock(&mux);        sem_wait(mux);

        
while(lput==lget&&nData)
        { 
            nFullThread
++;
            
//pthread_cond_wait(&condPut,&mux);            sem_wait(put);
            nFullThread
--;
        }
        buffer[lput
++]=data;
        nData
++;
        
if(lput==sizeQueue)
        {
            lput
=0;
        }
        
if(nEmptyThread)
        {
            
//pthread_cond_signal(&condGet);            sem_post(get);
        }

        
//pthread_mutex_unlock(&mux);        sem_post(mux);
    }
private:
    
//pthread_mutex_t mux;    sem_t* mux;
    
//pthread_cond_t condGet;
    
//pthread_cond_t condPut;    sem_t*get;
    sem_t
* put;

    
void** buffer;    //迴圈訊息佇列int sizeQueue;        //佇列大小int lput;        //location put  放資料的指標偏移int lget;        //location get  取資料的指標偏移int nFullThread;    //佇列滿,阻塞在putq處的執行緒數int nEmptyThread;    //佇列空,阻塞在getq處的執行緒數int nData;        //佇列中的訊息個數,主要用來判斷佇列空還是滿};

CThreadQueue queue;
//使用的時候給出稍大的CThreadQueue初始化引數,可以減少進入核心態的操作。void* produce(void* arg)
{
    
int i=0;
    pthread_detach(pthread_self());
    
while(i++<100)
    {
        queue.putq((
void*)i);
    }
}

void*consume(void*arg)
{
    
int data;
    
while(1)
    {
        data
=(int)(queue.getq());
        printf(
"data=%d\n",data);
    }
}

int main()
{    
    pthread_t pid;
    
int i=0;

    
while(i++<3)
        pthread_create(
&pid,0,produce,0);
    i
=0;
    
while(i++<3)
        pthread_create(
&pid,0,consume,0);
    sleep(
5);

    
return0;
}


不過, 訊號量除了可以作為二值計數器用於模擬執行緒鎖和條件變數之外, 還有比它們更加強大的功能, 訊號量可以用做資源計數器, 也就是說初始化訊號量的值為某個資源當前可用的數量, 使用了一個之後遞減, 歸還了一個之後遞增, 將前面的例子用資源計數器的形式再次改寫如下,注意在初始化的時候要將資源計數進行初始化, 在下面程式碼中的建構函式中將put初始化為佇列的最大數量, 而get為0:
#include <pthread.h>
#include 
<stdio.h>
#include 
<unistd.h>
#include 
<stdlib.h>
#include 
<fcntl.h>
#include 
<sys/stat.h>
#include 
<semaphore.h>class CThreadQueue
{
public:
    CThreadQueue(
int queueSize=1024):
        sizeQueue(queueSize),lput(
0),lget(0)
    {
        pthread_mutex_init(
&mux,0);
        
get= sem_open("get", O_RDWR | O_CREAT);
        put 
= sem_open("put", O_RDWR | O_CREAT);

        sem_init(
get00);
        sem_init(put, 
0, sizeQueue);

        buffer
=newvoid*[sizeQueue];
    }
    
virtual~CThreadQueue()
    {
        sem_unlink(
"get");
        sem_unlink(
"put");
        delete[] buffer;
    }
    
void* getq()
    {
        sem_wait(
get);

        
void*data;

        pthread_mutex_lock(
&mux);

        
/*
        while(lget==lput&&nData==0)
        {
            nEmptyThread++;
            //pthread_cond_wait(&condGet,&mux);
            nEmptyThread--;     
        }
        
*/

        data
=buffer[lget++];
        
//nData--;if(lget==sizeQueue)
        {
            lget
=0;
        }
        
/*
        if(nFullThread) //必要時才進行signal操作,勿總是signal
        {
            //pthread_cond_signal(&condPut);    
            sem_post(put);
        }
        
*/
        pthread_mutex_unlock(
&mux);

        sem_post(put);

        
return data;
    }
    
void putq(void*data)
    {
        sem_wait(put);

        pthread_mutex_lock(
&mux);

        
/*
        while(lput==lget&&nData)
        { 
            nFullThread++;
            //pthread_cond_wait(&condPut,&mux);
            sem_wait(put);
            nFullThread--;
        }
        
*/

        buffer[lput
++]=data;
        
//nData++;if(lput==sizeQueue)
        {
            lput
=0;
        }
        
/*
        if(nEmptyThread)
        {
            //pthread_cond_signal(&condGet);
            sem_post(get);
        }
        
*/

        pthread_mutex_unlock(
&mux);

        sem_post(
get);
    }
private:
    pthread_mutex_t mux;
    
//pthread_cond_t condGet;
    
//pthread_cond_t condPut;    sem_t*get;
    sem_t
* put;

    
void** buffer;    //迴圈訊息佇列int sizeQueue;        //佇列大小int lput;        //location put  放資料的指標偏移int lget;        //location get  取資料的指標偏移};

CThreadQueue queue;
//使用的時候給出稍大的CThreadQueue初始化引數,可以減少進入核心態的操作。void* produce(void* arg)
{
    
int i=0;
    pthread_detach(pthread_self());
    
while(i++<100)
    {
        queue.putq((
void*)i);
    }
}

void*consume(void*arg)
{
    
int data;
    
while(1)
    {
        data
=(int)(queue.getq());
        printf(
"data=%d\n",data);
    }
}

int main()
{    
    pthread_t pid;
    
int i=0;

    
while(i++<3)
        pthread_create(
&pid,0,produce,0);
    i
=0;
    
while(i++<3)
        pthread_create(
&pid,0,consume,0);
    sleep(
5);

    
return0;
}

可以看見,採用訊號量作為資源計數之後, 程式碼變得"很直白",原來的一些儲存佇列狀態的變數都不再需要了.

訊號量與執行緒鎖,條件變數相比還有以下幾點不同:
1)鎖必須是同一個執行緒獲取以及釋放, 否則會死鎖.而條件變數和訊號量則不必.
2)訊號的遞增與減少會被系統自動記住, 系統內部有一個計數器實現訊號量,不必擔心會丟失, 而喚醒一個條件變數時,如果沒有相應的執行緒在等待該條件變數, 這次喚醒將被丟失.