1. 程式人生 > >C++11實現自旋鎖

C++11實現自旋鎖

http://blog.poxiao.me/p/spinlock-implementation-in-cpp11/

自旋鎖(Spinlock)

自旋鎖是一種用於保護多執行緒共享資源的鎖,與一般的互斥鎖(mutex)不同之處在於當自旋鎖嘗試獲取鎖的所有權時會以忙等待(busy waiting)的形式不斷的迴圈檢查鎖是否可用。在多處理器環境中對持有鎖時間較短的程式來說使用自旋鎖代替一般的互斥鎖往往能提高程式的效能。

自旋鎖的原理

自旋鎖有兩種基本狀態:

  1. 鎖定狀態
    鎖定狀態又稱不可用狀態,當自旋鎖被某個執行緒持有時就是鎖定狀態,在自旋鎖被釋放之前其他執行緒不能獲得鎖的所有權。
  2. 可用狀態
    當自選鎖未被任何執行緒持有時的狀態就是可用狀態。

假設某自旋鎖內部使用bool型別的flag變數來標識自旋鎖的狀態。當flag為true表示鎖定狀態,為false表示可用狀態。
獲取自旋鎖的一種可行的流程如下

其中灰色方框中的3步應是一個不可分割的原子操作。
釋放自旋鎖時只需以原子操作的形式將flag置為false。

使用C++11的原子操作實現自旋鎖

C++11提供了對原子操作的支援,其中std::atomic是標準庫提供的一個原子類模板。可以這樣來宣告一個自旋鎖互斥物件spin_mutex。

1
2
3
4
5
6
7
8
9
10
11
#include <atomic>

class spin_mutex {
std::atomic<bool
> flag = ATOMIC_VAR_INIT(false);

public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock(); // 獲取自旋鎖
void unlock(); // 釋放自旋鎖
};

對於lock函式,若根據前面的流程圖來實現,需要CAS(compare and swap)的原子操作,可以使用std::atomic類模板的成員函式compare_exchange_strong。該成員函式的4個過載(overload)宣告如下

1
2
3
4
5
6
7
8
9
10
bool compare_exchange_strong(T& expected, T desired,
std::memory_order success,
std::memory_order failure)
; // overload 1

bool compare_exchange_strong(T& expected, T desired,
std::memory_order success,
std::memory_order failure) volatile
; // overload 2

bool compare_exchange_strong(T& expected, T desired,
std::memory_order order = std::memory_order_seq_cst)
; // overload 3

bool compare_exchange_strong(T& expected, T desired,
std::memory_order order = std::memory_order_seq_cst) volatile
; // overload 4

compare_exchange_strong能夠自動比較*this與expected的值,如果二者相等,會將*this的值修改為desired的值(執行read-modify-write操作),否則將expected的值修改為*this的值。
當*this被改變時compare_exchange_strong返回true,否則返回false。
這4個過載中還有3個型別為std::memory_order的引數,這些引數用於指定記憶體序(memory order),對記憶體序的介紹超出了本文的範圍,之後會在《C++11之多執行緒(五、記憶體序(Memory Order))》中較為詳細的介紹,這裡僅做簡單介紹。
簡單的講就是程式的記憶體訪問順序在編譯期(程式碼優化)和執行期(CPU的亂序執行)可能會被重新排序,這種重新排序往往是為了提高程式的執行速度,並且在單處理器的情況下這種重新排序不會對程式的正確性產生影響,但在多處理器的多執行緒環境中就可能得到非預期的結果。memory order就是用來指定原子操作周圍(指前後)的非原子操作的記憶體訪問如何被排序和同步。std::memory_order_seq_cst是標準庫中原子操作記憶體序的預設值,這是最為嚴格的memory order對效能有一定的損害,若不苛求效能可以總是使用這個值。
簡單介紹完了memory order再回到compare_exchange_strong的函式宣告中來。
在過載1和2中success引數用於指定當比較成功(*this與expected值相等時)執行read-modify-write操作的記憶體序,failure引數用於指定當比較失敗時執行load操作的記憶體序。
在過載3和4中order引數同時指定sccuess和failure時的記憶體序。
下面兩種呼叫方式是等價的。

1
2
flag.compare_exchange_strong(expected, desired, order);
flag.compare_exchange_strong(expected, desired, order, order);

對於unlock函式,可以使用std::atomic類模板的成員函式store來以原子操作的方式將flag置false。

1
2
void store(T desired, memory_order = std::memory_order_seq_cst);
void store(T desired, memory_order = std::memory_order_seq_cst) volatile;

使用預設記憶體序的自旋鎖完整的實現如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class spin_mutex {
std::atomic<bool> flag = ATOMIC_VAR_INIT(false);
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock() {
bool expected = false;
while(!flag.compare_exchange_strong(expected, true))
expected = false;
}
void unlock() {
flag.store(false);
}
};

下面的程式碼演示了使用自旋鎖保護變數num

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int num = 0;
spin_mutex sm;

void thread_proc()
{


for(int i = 0; i < 100000; ++i) {
sm.lock();
++num;
sm.unlock();
}
}

int main()
{

std::thread td1(thread_proc), td2(thread_proc);
td1.join();
td2.join();
std::cout << num << std::endl;
return 0;
}

指定記憶體序提高效能

通過指定記憶體序代替預設的std::memory_order_seq_cst可以提高效能。C++11的std::memory_order有6個列舉值。

Value 描述
std::memory_order_relaxed 沒有任何同步和排序限制,只需保證操作是原子的。
std::memory_order_consume 一個指定了此值的load操作在受影響的記憶體位置上執行consume操作,此操作使得另一個在同一記憶體位置執行了release操作的執行緒在此之前對資料依賴(data-dependent)的記憶體位置的寫操作為當前執行緒可見。
std::memory_order_acquire 一個指定了此值的load操作在受影響的記憶體位置上執行acquire操作,此操作使得另一個在同一記憶體位置執行了release操作的執行緒在此之前對任意的記憶體位置的寫操作為當前執行緒可見。
std::memory_order_release 一個指定了此值的store操作在受影響的記憶體位置上執行release操作,此操作使得此執行緒之前對資料依賴(data-dependent)的記憶體位置的寫操作為另一個在此之後通過對同一記憶體位置執行consume操作的執行緒可見,使得此執行緒之前對任意的記憶體位置的寫操作為另一個在此之後通過對同一個記憶體位置執行acquire操作的執行緒可見。
std::memory_order_acq_rel 一個指定了此值的read-modify-write操作,在讀階段(相當於load)對受影響的記憶體位置執行acquire操作,在寫階段(相當於store)對同一記憶體位置執行release操作。
std::memory_order_seq_cst 順序一致性,所有的執行緒觀察到的整個程式中記憶體修改順序是一致的。

在指定memory order前需要明確自旋鎖的責任:自旋鎖除了要避免多執行緒重入,還要保證一個執行緒在持有自旋鎖期間對記憶體的寫操作要能夠被另一個執行緒在獲得自旋鎖的時候可觀測到(可見)。
對照前面memory order列舉值的表,優先考慮效能。可以在unlock中對flag呼叫store置false的執行release操作。而在lock中對flag呼叫compare_exchange_strong的時候執行acquire操作。
使用Rlease-Acquire記憶體序的自旋鎖的完整實現如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class spin_mutex {
std::atomic<bool> flag = ATOMIC_VAR_INIT(false);
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock() {
bool expected = false;
while(!flag.compare_exchange_strong(expected, true, std::memory_order_acquire))
expected = false;
}
void unlock() {
flag.store(false, std::memory_order_release);
}
};

其中flag.compare_exchange_strong(expected, true, std::memory_order_acquire)等價於flag.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_acquire)。獲得自旋鎖的所有權的充分必要條件是當且僅當compare_exchange_strong比較成功且成功改變flag的值為true,也就是說當比較失敗時是沒有必要執行acquire操作的。
於是lock函式可以再修改成

1
2
3
4
5
void lock() {
bool expected = false;
while(!flag.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed))
expected = false;
}

不使用CAS(compare-and-swap)的實現

前面的CAS實現中lock函式中的原子操作需要3個步驟:

  1. 取flag的值;
  2. 比較flag的值;
  3. 置flag一個新值。

由於自旋鎖只有兩個狀態,事實上獲取自旋鎖可以使用下面的流程


其中灰色矩形中的2步應是一個不可分割的原子操作:

  1. 取flag的值;
  2. 置flag一個新值。

這樣就將原子操作中需要做的3步減少到2步了。
std::atomic類模板的exchange成員函式能夠以原子的方式對其進行賦值並返回舊值。

1
2
T exchange(T desired, memory_order = std::memory_order_seq_cst);
T exchange(T desired, memory_order = std::memory_order_seq_cst) volatile;

使用exchange實現的自旋鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class spin_mutex {
std::atomic<bool> flag = ATOMIC_VAR_INIT(false);
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock() {
while(flag.exchange(true, std::memory_order_acquire))
;
}
void unlock() {
flag.store(false, std::memory_order_release);
}
};

使用std::atomic_flag的實現

C++11並不要求std::atomic的實現必須是無鎖的(lock-free),可以通過使用std::atomic類模板的成員函式is_lock_free來檢查atomic物件是不是無鎖的。如果自旋鎖內部的flag不是無鎖的型別那麼這個自旋鎖就沒有存在的意義了。所幸C++11提供了一個無鎖的二值(bool)原子型別std::atomic_flag。使用std::atomic_flag就可以實現一個真正有用的自旋鎖了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class spin_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock() {
while(flag.test_and_set(std::memory_order_acquire))
;
}
void unlock() {
flag.clear(std::memory_order_release);
}
};

與標準庫的std::mutex一樣這裡實現的spin_mutex同樣不建議直接去呼叫lock和unlock函式,而推薦使用std::lock_guard來自動管理自旋鎖。完整的實現和使用程式碼如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <atomic>
#include <thread>
#include <mutex>
#include <iostream>

class spin_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock() {
while(flag.test_and_set(std::memory_order_acquire))
;
}
void unlock() {
flag.clear(std::memory_order_release);
}
};

int num = 0;
spin_mutex sm;

void thread_proc()
{


for(int i = 0; i < 100000; ++i) {
std::lock_guard<spin_mutex> lock(sm);
++num;
}
}

int main()
{

std::thread td1(thread_proc), td2(thread_proc);
td1.join();
td2.join();
std::cout << num << std::endl;
return 0;
}