1. 程式人生 > >併發無鎖佇列學習(資料結構)

併發無鎖佇列學習(資料結構)

提到平行計算通常都會想到加鎖,事實卻並非如此,大多數併發是不需要加鎖的。比如在不同電腦上執行的程式碼編輯器,兩者併發執行不需要加鎖。在一臺電腦上同時執行的媒體播放放器和程式碼編輯器,兩者併發執行不需要加鎖(當然系統呼叫和程序排程是要加鎖的)。在同一個程序中執行多個執行緒,如果各自處理獨立的事情也不需要加鎖(當然系統呼叫、程序排程和記憶體分配是要加鎖的)。在以上這些情況裡,各個併發實體之間沒有共享資料,所以雖然併發執行但不需要加鎖。

多執行緒併發執行時,雖然有共享資料,如果所有執行緒只是讀取共享資料而不修改它,也是不用加鎖的,比如程式碼段就是共享的“資料”,每個執行緒都會讀取,但是不用加鎖。排除所有這些情況,多執行緒之間有共享資料,有的執行緒要修改這些共享資料,有的執行緒要讀取這些共享資料,這才是程式設計師需要關注的情況,也是本節我們討論的範圍。

在併發的環境裡,加鎖可以保護共享的資料,但是加鎖也會存在一些問題:

o 由於臨界區無法併發執行,進入臨界區就需要等待,加鎖帶來效率的降低。

o 在複雜的情況下,很容易造成死鎖,併發實體之間無止境的互相等待。

o 在中斷/訊號處理函式中不能加鎖,給併發處理帶來困難。

o 優先順序倒置造成實時系統不能正常工作。低階優先程序拿到高優先順序程序需要的鎖,結果是高/低優先順序的程序都無法執行,中等優先順序的程序可能在狂跑。

由於併發與加鎖(互斥)的矛盾關係,無鎖資料結構自然成為程式設計師關注的焦點,這也是本節要介紹的:

o CPU提供的原子操作。

大約在七八年前,我們用apache的xerces來解析XML檔案,奇怪的是多執行緒反而比單執行緒慢。他們找了很久也沒有找出原因,只是證實使用多程序代替多執行緒會快一個數量級,在Windows上他們就使用了多程序的方式。後來移植到linux時候,我發現xerces每建立一個結點都會去更新一些全域性的統計資訊,比如把結點的總數加一,它使用的pthread_mutex實現互斥。這就是問題所在:一個XML文件有數以萬計的結點,以50個執行緒併發為例,每個執行緒解析一個XML文件,總共要進行上百萬次的加鎖/解鎖,幾乎所有執行緒都在等待,你說能快得了嗎?

當時我知道Windows下有InterlockedIncrement之類的函式,它們利用CPU一些特殊指令,保證對整數的基本操作是原子的。查找了一些資源發現Linux下也有類似的函式,後來我把所有加鎖去掉,換成這些原子操作,速度比多程序執行還快了幾倍。下面我們看++和—的原子操作在IA架構上的實現:

#define ATOMIC_SMP_LOCK "lock ; "
typedef struct { volatile int counter; } atomic_t;

static __inline__ void atomic_inc(atomic_t *v)
{
    __asm__ __volatile__(
        ATOMIC_SMP_LOCK "incl %0"
:"=m" (v->counter) :"m" (v->counter)); } static __inline__ void atomic_dec(atomic_t *v) { __asm__ __volatile__( ATOMIC_SMP_LOCK "decl %0" :"=m" (v->counter) :"m" (v->counter)); }

o 單入單出的迴圈佇列。單入單出的迴圈佇列是一種特殊情況,雖然特殊但是很實用,重要的是它不需要加鎖。這裡的單入是指只有一個執行緒向佇列裡追加資料(push),單出只是指只有一個執行緒從佇列裡取資料(pop),迴圈佇列與普通佇列相比,不同之處在於它的最大資料儲存量是事先固定好的,不能動態增長。儘管有這些限制它的應用還是相當廣泛的。這我們介紹一下它的實現:

資料下定義如下:

typedef struct _FifoRing
{
    int r_cursor;
    int w_cursor;
    size_t length;
    void* data[0];

}FifoRing;

r_cursor指向佇列頭,用於取資料(pop)。w_cursor指向佇列尾,用於追加資料(push)。length表示佇列的最大資料儲存量,data表示存放的資料,[0]在這裡表示變長的緩衝區(前面我們已經講過)。

建立函式

FifoRing* fifo_ring_create(size_t length)
{
    FifoRing* thiz = NULL;

    return_val_if_fail(length > 1, NULL);

    thiz = (FifoRing*)malloc(sizeof(FifoRing) + length * sizeof(void*));

    if(thiz != NULL)
    {
        thiz->r_cursor = 0;
        thiz->w_cursor = 0;
        thiz->length   = length;
    }

    return thiz;
}

這裡我們要求佇列的長度大於1而不是大於0,為什麼呢?排除長度為1的佇列沒有什麼意義的原因外,更重要的原因是佇列頭與佇列尾重疊 (r_cursor= =w_cursor) 時,到底表示是滿佇列還是空佇列?這個要搞清楚才行,上次一個同事犯了這個錯誤,讓我們查了很久。這裡我們認為佇列頭與佇列尾重疊時表示佇列為空,這與佇列初始狀態一致,後面在寫的時候始終保留一個空位,避免佇列頭與佇列尾重疊,這樣可以消除歧義了。

追加資料(push)

Ret fifo_ring_push(FifoRing* thiz, void* data)
{
    int w_cursor = 0;
    Ret ret = RET_FAIL;
    return_val_if_fail(thiz != NULL, RET_FAIL);

    w_cursor = (thiz->w_cursor + 1) % thiz->length;

    if(w_cursor != thiz->r_cursor)
    {
        thiz->data[thiz->w_cursor] = data;
        thiz->w_cursor = w_cursor;

        ret = RET_OK;
    }

    return ret;
}

佇列頭和佇列尾之間還有一個以上的空位時就追加資料,否則返回失敗。

取資料(pop)

Ret fifo_ring_pop(FifoRing* thiz, void** data)
{
    Ret ret = RET_FAIL;
    return_val_if_fail(thiz != NULL && data != NULL, RET_FAIL);

    if(thiz->r_cursor != thiz->w_cursor)
    {
        *data = thiz->data[thiz->r_cursor];
        thiz->r_cursor = (thiz->r_cursor + 1)%thiz->length;

        ret = RET_OK;
    }

    return ret;
}

佇列頭和佇列尾不重疊表示佇列不為空,取資料並移動佇列頭。

o 單寫多讀的無鎖資料結構。單寫表示只有一個執行緒去修改共享資料結構,多讀表示有多個執行緒去讀取共享資料結構。前面介紹的讀寫鎖可以有效的解決這個問題,但更高效的辦法是使用無鎖資料結構。思路如下:

就像為了避免顯示閃爍而使用的雙緩衝一樣,我們使用兩份資料結構,一份資料結構用於讀取,所有執行緒都可以在不加鎖的情況下讀取這個資料結構。另外一份資料結構用於修改,由於只有一個執行緒會修改它,所以也不用加鎖。

在修改之後,我們再交換讀/寫的兩個函式結構,把另外一份也修改過來,這樣兩個資料結構就一致了。在交換時要保證沒有執行緒在讀取,所以我們還需要一個讀執行緒的引用計數。現在我們看看怎麼把前面寫的雙向連結串列改為單寫多讀的無鎖資料結構。

為了保證交換是原子的,我們需要一個新的原子操作

//CAS(compare and swap)。

#define CAS(_a, _o, _n)                                    /
({ __typeof__(_o) __o = _o;                                /
   __asm__ __volatile__(                                   /
       "lock cmpxchg %3,%1"                                /
       : "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) /
       :  "0" (__o), "r" (_n) );                           /
   __o;                                                    /
})

資料結構

typedef struct _SwmrDList
{
    atomic_t rd_index_and_ref;
    DList* dlists[2];
}SwmrDList;

兩個連結串列,一個用於讀一個用於寫。rd_index_and_ref的最高位元組記錄用於讀取的雙向連結串列的索引,低24位用於記錄讀取執行緒的引用記數,最大支援16777216個執行緒同時讀取,應該是足夠了,所以後面不考慮它的溢位。

讀取操作

int      swmr_dlist_find(SwmrDList* thiz, DListDataCompareFunc cmp, void* ctx)
{
    int ret = 0;
    return_val_if_fail(thiz != NULL && thiz->dlists != NULL, -1);

    atomic_inc(&(thiz->rd_index_and_ref));
    size_t rd_index = (thiz->rd_index_and_ref.counter>>24) & 0x1;
    ret = dlist_find(thiz->dlists[rd_index], cmp, ctx);
    atomic_dec(&(thiz->rd_index_and_ref));

    return ret;
}

修改操作

Ret swmr_dlist_insert(SwmrDList* thiz, size_t index, void* data)
{
    Ret ret = RET_FAIL;
    DList* wr_dlist = NULL;
    return_val_if_fail(thiz != NULL && thiz->dlists != NULL, ret);

    size_t wr_index = !((thiz->rd_index_and_ref.counter>>24) & 0x1);
    if((ret = dlist_insert(thiz->dlists[wr_index], index, data)) == RET_OK)
    {
        int rd_index_old = thiz->rd_index_and_ref.counter & 0xFF000000;
        int rd_index_new = wr_index << 24;

        do
        {
            usleep(100);
        }while(CAS(&(thiz->rd_index_and_ref), rd_index_old, rd_index_new));

        wr_index = rd_index_old>>24;
        ret = dlist_insert(thiz->dlists[wr_index], index, data);
    }

    return ret;
}

先修改用於修改的雙向連結串列,修改完成之後等到沒有執行緒讀取時,交換讀/寫兩個連結串列,再修改另一個連結串列,此時兩個連結串列狀態保持一致。

稍做改進,對修改的操作進行加鎖,就可以支援多讀多寫的資料結構,讀是無鎖的,寫是加鎖的。

o 真正的無鎖資料結構。Andrei Alexandrescu的《Lock-FreeDataStructures》估計是這方面最經典的論文了,對他的方法我開始感到驚奇後來感到失望,驚奇的是演算法的巧妙,失望的是無鎖的限制和代價。作者最後說這種資料結構只適用於WRRMBNTM(Write-Rarely-Read-Many -But-Not-Too-Many)的情況。而且每次修改都要拷貝整個資料結構(甚至多次),所以不要指望這種方法能帶來多少效能上的提高,唯一的好處是能避免加鎖帶來的部分副作用。有興趣的朋友可以看下這篇論文,這裡我就不重複了。