1. 程式人生 > >Linux系統編程——線程(2)

Linux系統編程——線程(2)

運算 fin 成功 html 內容 while 順序 機制 cast

目錄

  • Linux系統編程——線程(2)
    • 同步概念
      • 線程同步
      • 數據混亂原因:
    • 互斥量mutex
      • pthread_mutex_init函數
      • pthread_mutex_destroy函數
      • pthread_mutex_lock函數
      • pthread_mutex_unlock函數
      • pthread_mutex_trylock函數
    • 線程加鎖後被取消
    • 多線程售票功能(互斥鎖)
    • 條件變量
      • pthread_cond_init函數
      • pthread_cond_destroy函數
      • pthread_cond_wait函數(※)
      • pthread_cond_timedwait函數
      • pthread_cond_signal函數
      • pthread_cond_broadcast函數
      • 條件變量的優點
    • 生產者消費者模型

Linux系統編程——線程(2)

前情提要: Linux系統編程——線程(1)


同步概念

? 所謂同步,即同時起步,協調一致。不同的對象,對“同步”的理解方式略有不同。如,設備同步,是指在兩個設備之間規定一個共同的時間參考;數據庫同步,是指讓兩個或多個數據庫內容保持一致,或者按需要部分保持一致;文件同步,是指讓兩個或多個文件夾裏的文件保持一致。

? 而編程中、通信中所說的同步與生活中大家印象中的同步概念略有差異。“同”字應是指協同、協助、互相配合。主旨在協同步調,按預定的先後次序運行。

線程同步

? 同步即協同步調,按預定的先後次序運行。

? 線程同步,指一個線程發出某一功能調用時,在沒有得到結果之前,該調用不返回。同時其它線程為保證數據一致性,不能調用該功能。

舉例:

#include <func.h>
//主線程與子線程各加1000萬
#define N 10000000
void* threadFunc(void *p)
{
    int *num=(int*)p;
    int i;
    for(i=0;i<N;i++)
    {
        *num+=1;
    }
    printf("I am child thread\n");
    return NULL;
}

int main()
{
    pthread_t pthID;//線程ID
    int ret;
    int num=0;
    ret=pthread_create(&pthID,NULL,threadFunc,&num);
    if(ret!=0)
    {
        printf("pthread_create:%s\n",strerror(ret));
        return -1;
    }
    int i;
    for(i=0;i<N;i++)
    {
        num=num+1;
    }
    pthread_join(pthID,NULL);
    printf("I am main thread,%d\n",num);
    return 0;
}

技術分享圖片

? “同步”的目的,是為了避免數據混亂,解決與時間有關的錯誤。實際上,不僅線程間需要同步,進程間、信號間等等都需要同步機制。

? 因此,所有“多個控制流,共同操作一個共享資源”的情況,都需要同步。

數據混亂原因:

  1. 資源共享(獨享資源則不會)

  2. 調度隨機(意味著數據訪問會出現競爭)

  3. 線程間缺乏必要的同步機制。

? 以上3點中,前兩點不能改變,欲提高效率,傳遞數據,資源必須共享。只要共享資源,就一定會出現競爭。只要存在競爭關系,數據就很容易出現混亂。

? 所以只能從第三點著手解決。使多個線程在訪問共享資源的時候,出現互斥。

互斥量mutex

Linux中提供一把互斥鎖mutex(也稱之為互斥量)。

每個線程在對資源操作前都嘗試先加鎖,成功加鎖才能操作,操作結束解鎖。

資源還是共享的,線程間也還是競爭的。

pthread_mutex_init函數

pthread_mutex_destroy函數

pthread_mutex_lock函數

pthread_mutex_trylock函數

pthread_mutex_unlock函數

以上5個函數的返回值都是:成功返回0, 失敗返回錯誤號

pthread_mutex_t 類型,其本質是一個結構體。為簡化理解,應用時可忽略其實現細節,簡單當成整數看待。

pthread_mutex_t mutex; 變量mutex只有兩種取值1、0。

pthread_mutex_init函數

初始化一個互斥鎖(互斥量) ---> 初值可看作1

? int pthread_mutex_init(pthread_mutex_t restrict mutex, const pthread_mutexattr_t restrict attr);

參1:傳出參數,調用時應傳 &mutex

restrict關鍵字:只用於限制指針,告訴編譯器,所有修改該指針指向內存中內容的操作,只能通過本指針完成。不能通過除本指針以外的其他變量或指針修改

參2:互斥量屬性。是一個傳入參數,通常傳NULL,選用默認屬性(線程間共享)。 參APUE.12.4同步屬性

  1. 靜態初始化:如果互斥鎖 mutex 是靜態分配的(定義在全局,或加了static關鍵字修飾),可以直接使用宏進行初始化。e.g. pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;

  2. 動態初始化:局部變量應采用動態初始化。e.g. pthread_mutex_init(&mutex, NULL)

pthread_mutex_destroy函數

銷毀一個互斥鎖

? int pthread_mutex_destroy(pthread_mutex_t *mutex);

#include <func.h>

int main()
{
    pthread_mutex_t mutex;
    int ret;
    ret=pthread_mutex_init(&mutex,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
    ret=pthread_mutex_destroy(&mutex);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
    return 0;
}

pthread_mutex_lock函數

加鎖。可理解為將mutex--(或-1)

? int pthread_mutex_lock(pthread_mutex_t *mutex);

#include <func.h>

int main()
{
    pthread_mutex_t mutex;
    int ret;
    ret=pthread_mutex_init(&mutex,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
    pthread_mutex_lock(&mutex);
    pthread_mutex_lock(&mutex);//會阻塞
    printf("you can't see me\n");
    ret=pthread_mutex_destroy(&mutex);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
    return 0;
}

pthread_mutex_unlock函數

解鎖。可理解為將mutex ++(或+1)

? int pthread_mutex_unlock(pthread_mutex_t *mutex);

主線程與子線程實現對同一變量的加法運算:

#include <func.h>
//主線程與子線程各加1000萬
#define N 10000000
typedef struct{
    int num;
    pthread_mutex_t mutex;
}Data;
void* threadFunc(void *p)
{
    Data *pThreadInfo=(Data *)p;
    int i;
    for(i=0;i<N;i++)
    {
        pthread_mutex_lock(&pThreadInfo->mutex);
        pThreadInfo->num+=1;
        pthread_mutex_unlock(&pThreadInfo->mutex);
    }
    printf("I am child thread\n");
    return NULL;
}

int main()
{
    pthread_t pthID;//線程ID
    int ret;
    Data threadInfo;
    threadInfo.num=0;
    ret=pthread_mutex_init(&threadInfo.mutex,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
    struct timeval start,end;
    gettimeofday(&start,NULL);
    ret=pthread_create(&pthID,NULL,threadFunc,&threadInfo);
    if(ret!=0)
    {
        printf("pthread_create:%s\n",strerror(ret));
        return -1;
    }
    int i;
    for(i=0;i<N;i++)
    {
        pthread_mutex_lock(&threadInfo.mutex);
        threadInfo.num+=1;
        pthread_mutex_unlock(&threadInfo.mutex);
    }
    pthread_join(pthID,NULL);
    gettimeofday(&end,NULL);
    ret=pthread_mutex_destroy(&threadInfo.mutex);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
    printf("I am main thread,%d,use time=%ld\n",threadInfo.num,(end.tv_sec-start.tv_sec)*1000000+end.tv_usec-start.tv_usec);
    return 0;
}

技術分享圖片

pthread_mutex_trylock函數

嘗試加鎖

? int pthread_mutex_trylock(pthread_mutex_t *mutex);

#include <func.h>
//trylock嘗試加鎖
int main()
{
    pthread_mutex_t mutex;
    int ret;
    ret=pthread_mutex_init(&mutex,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
    pthread_mutex_lock(&mutex);
    ret=pthread_mutex_trylock(&mutex);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_trylock");
    ret=pthread_mutex_destroy(&mutex);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
    return 0;
}

技術分享圖片

線程加鎖後被取消

? 線程取消的方法是一個線程向目標線程發cancel 信號,但是如何處理cancel 信號則由目標線程自己決

定,目標線程或者忽略、或者立即終止、或者繼續運行至cancelation-point(取消點)後終止。

? 線程為了訪問臨界共享資源而為其加上鎖,但在訪問過程中該線程被外界取消,或者發生了中斷,則該

臨界資源將永遠處於鎖定狀態得不到釋放。外界取消操作是不可預見的,因此的確需要一個機制來簡化用於

資源釋放的編程。

? 在POSIX 線程API 中提供了一個pthread_cleanup_push()/pthread_cleanup_pop()函數對用於自動釋

放資源,從pthread_cleanup_push()的調用點到pthread_cleanup_pop()之間的程序段中的終止動作都將執行

pthread_cleanup_push()所指定的清理函數。

函數原型:

void pthread_cleanup_push(void (*routine) (void *), void *arg)

void pthread_cleanup_pop(int execute)

pthread_cleanup_push()/pthread_cleanup_pop()采用先入後出的棧結構管理,並且必須成對出現

void routine(void *arg)函數在調用pthread_cleanup_push()時壓入清理函數棧,多次對

pthread_cleanup_push()的調用將在清理函數棧中形成一個函數鏈,在執行該函數鏈時按照壓棧的相反

順序彈出。execute 參數表示執行到pthread_cleanup_pop()時是否在彈出清理函數的同時執行該函數,

為0 表示不執行,非0 為執行;這個參數並不影響異常終止時清理函數的執行。

#include <func.h>
void cleanup(void *p)
{
    pthread_mutex_unlock((pthread_mutex_t*)p);
    printf("unlock success\n");
}
//線程加鎖以後被cancel怎麽辦
void* threadFunc(void *p)
{
    pthread_mutex_t* pMutex=(pthread_mutex_t*)p;
    pthread_cleanup_push(cleanup,pMutex); //指定清理函數cleanup
    pthread_mutex_lock(pMutex);
    sleep(3);
    pthread_cleanup_pop(1);
    pthread_exit(NULL);
}
int main()
{
    pthread_mutex_t mutex;
    int ret;
    ret=pthread_mutex_init(&mutex,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
    pthread_t pthId[2];
    int i;
    for(i=0;i<2;i++)
    {
        pthread_create(pthId+i,NULL,threadFunc,&mutex);
    }
    for(i=0;i<2;i++)
    {
        ret=pthread_cancel(pthId[i]);
        THREAD_ERROR_CHECK(ret,"pthread_cancel");
    }
    for(i=0;i<2;i++)
    {
        pthread_join(pthId[i],NULL);
    }
    ret=pthread_mutex_destroy(&mutex);
    THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
    return 0;
}

技術分享圖片

多線程售票功能(互斥鎖)

#include <func.h>

typedef struct{
    int tickets;
    pthread_mutex_t mutex;
}Train;
void* saleWindows1(void* p)
{
    Train *pSale=(Train*)p;
    int count=0;
    while(1)
    {
        pthread_mutex_lock(&pSale->mutex);
        if(pSale->tickets>0)
        {
            //printf("I am saleWindows1,the tickets is %d\n",pSale->tickets);
            pSale->tickets--;
            count++;
            //printf("sale finish,I am saleWindows1,the tickets is %d\n",pSale->tickets);
            pthread_mutex_unlock(&pSale->mutex);
        }else{
            pthread_mutex_unlock(&pSale->mutex);
            printf("I am windows1 sale %d\n",count);
            break;
        }
    }
    return NULL;
}
void* saleWindows2(void* p)
{
    Train *pSale=(Train*)p;
    int count=0;
    while(1)
    {
        pthread_mutex_lock(&pSale->mutex);
        if(pSale->tickets>0)
        {
            //printf("I am saleWindows2,the tickets is %d\n",pSale->tickets);
            pSale->tickets--;
            count++;
            //printf("sale finish,I am saleWindows2,the tickets is %d\n",pSale->tickets);
            pthread_mutex_unlock(&pSale->mutex);
        }else{
            pthread_mutex_unlock(&pSale->mutex);
            printf("I am windows2 sale %d\n",count);
            break;
            break;
        }
    }
    return NULL;
}

typedef void* (*threadFunc)(void*);
int main()
{
    Train t;
    pthread_t pthId[2];
    int i;
    t.tickets=20000000;
    threadFunc pthreadFunc[2]={saleWindows1,saleWindows2};
    pthread_mutex_init(&t.mutex,NULL);
    for(i=0;i<2;i++)
    {
        pthread_create(pthId+i,NULL,pthreadFunc[i],&t);
    }
    for(i=0;i<2;i++)
    {
        pthread_join(pthId[i],NULL);
    }
    printf("sale over\n");
}

條件變量

? 條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待條件變

量的條件成立而掛起;另一個線程使條件成立(給出條件成立信號)。為了防止競爭,條件變量的使用總是

和一個互斥鎖結合在一起。

? pthread_cond_init函數

? pthread_cond_destroy函數

? pthread_cond_wait函數

? pthread_cond_timedwait函數

? pthread_cond_signal函數

? pthread_cond_broadcast函數

以上6 個函數的返回值都是:成功返回0, 失敗直接返回錯誤號。

? pthread_cond_t類型 用於定義條件變量

? pthread_cond_t cond;

pthread_cond_init函數

初始化一個條件變量

int pthread_cond_init(pthread_cond_t restrict cond, const pthread_condattr_t restrict attr);

參2:attr表條件變量屬性,通常為默認值,傳NULL即可

也可以使用靜態初始化的方法,初始化條件變量:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

pthread_cond_destroy函數

銷毀一個條件變量

int pthread_cond_destroy(pthread_cond_t *cond);

pthread_cond_wait函數(※)

阻塞等待一個條件變量

? int pthread_cond_wait(pthread_cond_t restrict cond, pthread_mutex_t restrict mutex);

函數作用:

  1. 阻塞等待條件變量cond(參1)滿足

  2. 釋放已掌握的互斥鎖(解鎖互斥量)相當於pthread_mutex_unlock(&mutex);

1.2.兩步為一個原子操作。

  1. 當被喚醒,pthread_cond_wait函數返回時,解除阻塞並重新申請獲取互斥鎖pthread_mutex_lock(&mutex);

先舉一個簡單的例子:

#include <func.h>
typedef struct{
    pthread_cond_t cond;
    pthread_mutex_t mutex;
}Data;
void* threadFunc(void* p)
{
    Data* pthreadInfo=(Data*)p;
    int ret;
    pthread_mutex_lock(&pthreadInfo->mutex);
    ret=pthread_cond_wait(&pthreadInfo->cond,&pthreadInfo->mutex);
    pthread_mutex_unlock(&pthreadInfo->mutex);
    printf("I am child,after wait\n");
    pthread_exit(NULL);
}
int main()
{
    Data threadInfo;
    int ret;
    ret=pthread_cond_init(&threadInfo.cond,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_cond_init");
    pthread_mutex_init(&threadInfo.mutex,NULL);
    pthread_t pthId;
    pthread_create(&pthId,NULL,threadFunc,&threadInfo);
    sleep(1);
    ret=pthread_cond_signal(&threadInfo.cond);
    THREAD_ERROR_CHECK(ret,"pthread_cond_signal");
    printf("send signal ok\n");
    pthread_join(pthId,NULL);
    printf("I am main thread\n");
    return 0;
}

技術分享圖片

代碼解釋:主線程創建結構體,包含mutex和cond,初始化後創建子線程,子線程上鎖,並執行pthread_cond_wait阻塞等待條件變量的到來,sleep(1)後主線程發送signal信號使條件變量成立,故打印出 send signal ok和I am child,after wait,最後用join回收子線程。

pthread_cond_timedwait函數

功能:計時等待一個條件變量。線程解開mutex 指向的鎖並被條件變量cond 阻塞。其中計時等待方式表示經歷abstime 段時間後,即使條件變量不滿足,阻塞也被解除

函數原型:

int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

參3: 參看man sem_timedwait函數,查看struct timespec結構體。

? struct timespec {

? time_t tv_sec; /* seconds */ 秒

? long tv_nsec; /* nanosecondes*/ 納秒

? }

形參abstime:絕對時間。

如:time(NULL)返回的就是絕對時間。而alarm(1)是相對時間,相對當前時間定時1秒鐘。

struct timespec t = {1, 0};

pthread_cond_timedwait (&cond, &mutex, &t);只能定時到 1970年1月1日 00:00:01秒(早已經過去)

正確用法:

time_t cur = time(NULL); //獲取當前時間。

struct timespec t;    //定義timespec 結構體變量t

t.tv_sec = cur+1; //定時1秒

pthread_cond_timedwait (&cond, &mutex, &t); //傳參                           

struct timeval {

    time_t  tv_sec;  /* seconds * 秒/

    suseconds_t tv_usec;   /* microseconds * 微秒/

};

示例:

#include <func.h>
typedef struct{
    pthread_cond_t cond;
    pthread_mutex_t mutex;
}Data;
void* threadFunc(void* p)
{
    Data* pthreadInfo=(Data*)p;
    int ret;
    struct timespec t;
    t.tv_nsec=0;
    t.tv_sec=time(NULL)+5;
    pthread_mutex_lock(&pthreadInfo->mutex);
    ret=pthread_cond_timedwait(&pthreadInfo->cond,&pthreadInfo->mutex,&t);
    printf("pthread_cond_timedwait ret=%d\n",ret);
    pthread_mutex_unlock(&pthreadInfo->mutex);
    printf("I am child,after wait\n");
    pthread_exit(NULL);
}
int main()
{
    Data threadInfo;
    int ret;
    ret=pthread_cond_init(&threadInfo.cond,NULL);
    THREAD_ERROR_CHECK(ret,"pthread_cond_init");
    pthread_mutex_init(&threadInfo.mutex,NULL);
    pthread_t pthId;
    pthread_create(&pthId,NULL,threadFunc,&threadInfo);
    sleep(10);
    ret=pthread_cond_signal(&threadInfo.cond);
    THREAD_ERROR_CHECK(ret,"pthread_cond_signal");
    printf("send signal ok\n");
    pthread_join(pthId,NULL);
    printf("I am main thread\n");
    return 0;
}

技術分享圖片

代碼解釋:主線程控制10s後發signal但子線程設置5秒計時,超時則自動解鎖,故程序啟動後過了5秒後打印I am child,after wait,再過5秒後打印剩余部分!

pthread_cond_signal函數

喚醒至少一個阻塞在條件變量上的線程

int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_broadcast函數

喚醒全部阻塞在條件變量上的線程

? int pthread_cond_broadcast(pthread_cond_t *cond);

條件變量的優點

? 相較於mutex而言,條件變量可以減少競爭。

? 如直接使用mutex,除了生產者、消費者之間要競爭互斥量以外,消費者之間也需要競爭互斥量,但如

果匯聚(鏈表)中沒有數據,消費者之間競爭互斥鎖是無意義的。有了條件變量機制以後,只有生產者完成

生產,才會引起消費者之間的競爭。提高了程序效率。

生產者消費者模型

技術分享圖片

通過設置條件變量,在售票為空後自動補票。

#include <func.h>

typedef struct{
    int tickets;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
}Train;
void* saleWindows1(void* p)
{
    Train *pSale=(Train*)p;
    int count=0;
    while(1)
    {
        pthread_mutex_lock(&pSale->mutex);
        if(pSale->tickets>0)
        {
            printf("I am saleWindows1,the tickets is %d\n",pSale->tickets);
            pSale->tickets--;
            if(0==pSale->tickets)
            {
                pthread_cond_signal(&pSale->cond);
            }
            count++;
            printf("sale finish,I am saleWindows1,the tickets is %d\n",pSale->tickets);
            pthread_mutex_unlock(&pSale->mutex);
        }else{
            pthread_mutex_unlock(&pSale->mutex);
            printf("I am windows1 sale %d\n",count);
            break;
        }
        sleep(1);
    }
    return NULL;
}
void* saleWindows2(void* p)
{
    Train *pSale=(Train*)p;
    int count=0;
    while(1)
    {
        pthread_mutex_lock(&pSale->mutex);
        if(pSale->tickets>0)
        {
            printf("I am saleWindows2,the tickets is %d\n",pSale->tickets);
            pSale->tickets--;
            if(0==pSale->tickets)
            {
                pthread_cond_signal(&pSale->cond);
            }
            count++;
            printf("sale finish,I am saleWindows2,the tickets is %d\n",pSale->tickets);
            pthread_mutex_unlock(&pSale->mutex);
        }else{
            pthread_mutex_unlock(&pSale->mutex);
            printf("I am windows2 sale %d\n",count);
            break;
        }
        sleep(1);
    }
    return NULL;
}
void* setTickets(void* p)
{
    Train *pSale=(Train*)p;
    pthread_mutex_lock(&pSale->mutex);
    if(pSale->tickets>0)
    {
        pthread_cond_wait(&pSale->cond,&pSale->mutex);
    }
    pSale->tickets=20;
    pthread_mutex_unlock(&pSale->mutex);
    return NULL;
}
typedef void* (*threadFunc)(void*);
#define N 3
int main()
{
    Train t;
    pthread_t pthId[N];
    int i;
    t.tickets=20;
    threadFunc pthreadFunc[N]={saleWindows1,saleWindows2,setTickets};
    pthread_mutex_init(&t.mutex,NULL);
    pthread_cond_init(&t.cond,NULL);
    for(i=0;i<N;i++)
    {
        pthread_create(pthId+i,NULL,pthreadFunc[i],&t);
    }
    for(i=0;i<N;i++)
    {
        pthread_join(pthId[i],NULL);
    }
    printf("sale over\n");
}

技術分享圖片

Linux系統編程——線程(2)