1. 程式人生 > >用原子操作實現無鎖程式設計

用原子操作實現無鎖程式設計

假設我們要維護一個全域性的執行緒安全的 int 型別變數 count, 下面這兩行程式碼都是很危險的:

count ++;

count += n;

我們知道, 高階語言中的一條語句, 並不是一個原子操作. 比如一個最簡單的自增操作就分為三步: 

1. 從快取取到暫存器
2. 在暫存器加1
3. 存入快取。

多個執行緒訪問同一塊記憶體時, 需要加鎖來保證訪問操作是互斥的. 

所以, 我們可以在操作 count 的時候加一個互斥鎖. 如下面的程式碼:

pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_lock(&count_lock);
count++;
pthread_mutex_unlock(&count_lock);

另一個辦法就是, 讓 count++ 和 count+=n 這樣的語句變成原子操作. 一個原子操作必然是執行緒安全的. 有兩種使用原子操作的方式:

1. 使用 gcc 的原子操作

2. 使用 c++11中STL中的 stomic 類的函式

在這裡我只介紹 gcc 裡的原子操作, 這些函式分成以下幾組: type __sync_fetch_and_add (type *ptr, type value, ...) type __sync_fetch_and_sub (type *ptr, type value, ...) type __sync_fetch_and_or (type *ptr, type value, ...) type __sync_fetch_and_and (type *ptr, type value, ...) type __sync_fetch_and_xor (type *ptr, type value, ...) type __sync_fetch_and_nand (type *ptr, type value, ...) 返回更新前的值 type __sync_add_and_fetch (type *ptr, type value, ...) type __sync_sub_and_fetch (type *ptr, type value, ...) type __sync_or_and_fetch (type *ptr, type value, ...) type __sync_and_and_fetch (type *ptr, type value, ...) type __sync_xor_and_fetch (type *ptr, type value, ...) type __sync_nand_and_fetch (type *ptr, type value, ...) 返回更新後的值 bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) 這兩個函式提供原子的比較和交換,如果*ptr == oldval,就將newval寫入*ptr, 第一個函式在相等並寫入的情況下返回true. 第二個函式在返回操作之前的值。  type __sync_lock_test_and_set (type *ptr, type value, ...) 將*ptr設為value並返回*ptr操作之前的值。 void __sync_lock_release (type *ptr, ...) 將*ptr置0  因為 gcc 具體實現的問題, 後面的可擴充套件引數 (...) 沒有什麼用, 可以省略掉. gcc 保證了這些介面都是原子的. 呼叫這些介面時, 前端序列匯流排會被鎖住, 鎖住了它, 其它 cpu 就不能從儲存器獲取資料. 從而保證對記憶體操作的互斥. 當然, 這種操作是有不小代價, 所以只能在操作小的記憶體才可以這麼做. 上面的介面使用的 type 只能是 1, 2, 4 或 8 位元組的整形, 即:
int8_t / uint8_t int16_t / uint16_t int32_t / uint32_t int64_t / uint64_t 效能上原子操作的速度是互斥鎖的6~7倍。 有了這些函式, 就可以很方便的進行原子操作了, 以 count++ 為例,  count 初始值為0, 可以這麼寫

__sync_fetch_and_add(&count, 1);//返回0, count現在等於1, 類似 count ++

count 初始值為0, 或者這麼寫

__sync_add_and_fetch(&count, 1);//返回1, count現在等於1, 類似 ++ count

原子操作也可以用來實現互斥鎖:

int a = 0; #define LOCK(a) while (__sync_lock_test_and_set(&a,1)) {sched_yield();} #define UNLOCK(a) __sync_lock_release(&a); sched_yield()這個函式可以使用另一個級別等於或高於當前執行緒的執行緒先執行。如果沒有符合條件的執行緒,那麼這個函式將會立刻返回然後繼續執行當前執行緒的程式。   如果去掉 sched_yield(), 這個鎖就會一直自旋. 下面我們利用原子操作來實現一個無鎖併發堆疊; struct Node{     void* data;     Node* next     Node(void* d):data(d),next(NULL){}  }; class Stack{ public:     Stack():top(NULL){}     void Push(void* d);     void* Pop(); private:     Node *top; }; void Stack::Push(void* d){     Node* n = new Node(d);     for (;;){         n->next = top;         if (__sync_bool_compare_and_swap(&top, n->next, n)){             break;         }     } } 壓棧操作首先建立了一個新節點,它的 next 指標指向堆疊的頂部。然後用原子操作把新的節點複製到 top 位置。 從多個執行緒的角度來看,完全可能有兩個或更多執行緒同時試圖把資料壓入堆疊。假設執行緒 A 試圖把 pA 壓入堆疊,執行緒 B 試圖壓入 pB,執行緒 A 先獲得了時間片。在 n->next = top 指令結束之後,排程程式暫停了執行緒 A。現在,執行緒 B 獲得了時間片,它能夠完成原子操作,把 pB 壓入堆疊後結束。接下來,執行緒 A 恢復執行,顯然對於這個執行緒 *top 和 n->next 不匹配,因為執行緒 B 修改了 top 位置的內容。因此,程式碼回到迴圈的開頭,指向正確的 top 指標(執行緒 B 修改後的),呼叫原子操作,把 pA 壓入堆疊後結束。 void* Stack::Pop(){     for (;;){         Node* n = top;         if (n == NULL){             return NULL;         }         if (top != NULL && __sync_bool_compare_and_swap(&top, n, n->next)){             void* p = n->data;             delete n;             return p;         }     } }

出棧操作的原理和壓棧類似. 即使執行緒 B 線上程 A 試圖彈出資料的同時修改了堆疊頂,也可以確保不會跳過堆疊中的元素。