C++11 併發指南六( 型別詳解二 std::atomic )
C++11 併發指南六(atomic 型別詳解一 atomic_flag 介紹) 一文介紹了 C++11 中最簡單的原子型別 std::atomic_flag,但是 std::atomic_flag 過於簡單,只提供了 test_and_set 和 clear 兩個 API,不能滿足其他需求(如 store, load, exchange, compare_exchange 等),因此本文將介紹功能更加完善的 std::atomic 類。
std::atomic 基本介紹
std::atomic 是模板類,一個模板型別為 T 的原子物件中封裝了一個型別為 T 的值。
template <classT> struct atomic;
原子型別物件的主要特點就是從不同執行緒訪問不會導致資料競爭(data race)。因此從不同執行緒訪問某個原子物件是良性 (well-defined) 行為,而通常對於非原子型別而言,併發訪問某個物件(如果不做任何同步操作)會導致未定義 (undifined) 行為發生。
C++11 標準中的基本 std::atomic 模板定義如下:
template < class T > struct atomic { bool is_lock_free() const volatile; bool is_lock_free() const; void store(T, memory_order = memory_order_seq_cst) volatile; void store(T, memory_order = memory_order_seq_cst); T load(memory_order = memory_order_seq_cst) const volatile; T load(memory_order = memory_order_seq_cst) const; operator T() const volatile; operator T() const; T exchange(T, memory_order = memory_order_seq_cst) volatile; T exchange(T, memory_order = memory_order_seq_cst); bool compare_exchange_weak(T &, T, memory_order, memory_order) volatile; bool compare_exchange_weak(T &, T, memory_order, memory_order); bool compare_exchange_strong(T &, T, memory_order, memory_order) volatile; bool compare_exchange_strong(T &, T, memory_order, memory_order); bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst); bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst); atomic() = default; constexpr atomic(T); atomic(const atomic &) = delete; atomic & operator=(const atomic &) = delete; atomic & operator=(const atomic &) volatile = delete; T operator=(T) volatile; T operator=(T); };
另外,C++11 標準庫 std::atomic 提供了針對整形(integral)和指標型別的特化實現,分別定義如下:
針對整形(integal)的特化,其中 integal 代表瞭如下型別char, signed char, unsigned char, short, unsigned short, int, unsigned int, long, unsigned long, long long, unsigned long long, char16_t, char32_t, wchar_t:
template <> struct atomic<integral> { bool is_lock_free() const volatile; bool is_lock_free() const; void store(integral, memory_order = memory_order_seq_cst) volatile; void store(integral, memory_order = memory_order_seq_cst); integral load(memory_order = memory_order_seq_cst) const volatile; integral load(memory_order = memory_order_seq_cst) const; operator integral() const volatile; operator integral() const; integral exchange(integral, memory_order = memory_order_seq_cst) volatile; integral exchange(integral, memory_order = memory_order_seq_cst); bool compare_exchange_weak(integral&, integral, memory_order, memory_order) volatile; bool compare_exchange_weak(integral&, integral, memory_order, memory_order); bool compare_exchange_strong(integral&, integral, memory_order, memory_order) volatile; bool compare_exchange_strong(integral&, integral, memory_order, memory_order); bool compare_exchange_weak(integral&, integral, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_weak(integral&, integral, memory_order = memory_order_seq_cst); bool compare_exchange_strong(integral&, integral, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_strong(integral&, integral, memory_order = memory_order_seq_cst); integral fetch_add(integral, memory_order = memory_order_seq_cst) volatile; integral fetch_add(integral, memory_order = memory_order_seq_cst); integral fetch_sub(integral, memory_order = memory_order_seq_cst) volatile; integral fetch_sub(integral, memory_order = memory_order_seq_cst); integral fetch_and(integral, memory_order = memory_order_seq_cst) volatile; integral fetch_and(integral, memory_order = memory_order_seq_cst); integral fetch_or(integral, memory_order = memory_order_seq_cst) volatile; integral fetch_or(integral, memory_order = memory_order_seq_cst); integral fetch_xor(integral, memory_order = memory_order_seq_cst) volatile; integral fetch_xor(integral, memory_order = memory_order_seq_cst); atomic() = default; constexpr atomic(integral); atomic(const atomic&) = delete; atomic& operator=(const atomic&) = delete; atomic& operator=(const atomic&) volatile = delete; integral operator=(integral) volatile; integral operator=(integral); integral operator++(int) volatile; integral operator++(int); integral operator--(int) volatile; integral operator--(int); integral operator++() volatile; integral operator++(); integral operator--() volatile; integral operator--(); integral operator+=(integral) volatile; integral operator+=(integral); integral operator-=(integral) volatile; integral operator-=(integral); integral operator&=(integral) volatile; integral operator&=(integral); integral operator|=(integral) volatile; integral operator|=(integral); integral operator^=(integral) volatile; integral operator^=(integral); };
針對指標的特化:
template <class T> struct atomic<T*> { bool is_lock_free() const volatile; bool is_lock_free() const; void store(T*, memory_order = memory_order_seq_cst) volatile; void store(T*, memory_order = memory_order_seq_cst); T* load(memory_order = memory_order_seq_cst) const volatile; T* load(memory_order = memory_order_seq_cst) const; operator T*() const volatile; operator T*() const; T* exchange(T*, memory_order = memory_order_seq_cst) volatile; T* exchange(T*, memory_order = memory_order_seq_cst); bool compare_exchange_weak(T*&, T*, memory_order, memory_order) volatile; bool compare_exchange_weak(T*&, T*, memory_order, memory_order); bool compare_exchange_strong(T*&, T*, memory_order, memory_order) volatile; bool compare_exchange_strong(T*&, T*, memory_order, memory_order); bool compare_exchange_weak(T*&, T*, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_weak(T*&, T*, memory_order = memory_order_seq_cst); bool compare_exchange_strong(T*&, T*, memory_order = memory_order_seq_cst) volatile; bool compare_exchange_strong(T*&, T*, memory_order = memory_order_seq_cst); T* fetch_add(ptrdiff_t, memory_order = memory_order_seq_cst) volatile; T* fetch_add(ptrdiff_t, memory_order = memory_order_seq_cst); T* fetch_sub(ptrdiff_t, memory_order = memory_order_seq_cst) volatile; T* fetch_sub(ptrdiff_t, memory_order = memory_order_seq_cst); atomic() = default; constexpr atomic(T*); atomic(const atomic&) = delete; atomic& operator=(const atomic&) = delete; atomic& operator=(const atomic&) volatile = delete; T* operator=(T*) volatile; T* operator=(T*); T* operator++(int) volatile; T* operator++(int); T* operator--(int) volatile; T* operator--(int); T* operator++() volatile; T* operator++(); T* operator--() volatile; T* operator--(); T* operator+=(ptrdiff_t) volatile; T* operator+=(ptrdiff_t); T* operator-=(ptrdiff_t) volatile; T* operator-=(ptrdiff_t); };
std::atomic 成員函式
好了,對 std::atomic 有了一個最基本認識之後我們來看 std::atomic 的成員函式吧。
std::atomic 建構函式
std::atomic 的建構函式如下:
default (1) |
atomic() noexcept = default; |
---|---|
initialization (2) |
constexpr atomic (T val) noexcept; |
copy [deleted] (3) |
atomic (const atomic&) = delete; |
- 預設建構函式,由預設建構函式建立的 std::atomic 物件處於未初始化(uninitialized)狀態,對處於未初始化(uninitialized)狀態 std::atomic物件可以由 atomic_init 函式進行初始化。
- 初始化建構函式,由型別 T初始化一個 std::atomic物件。
- 拷貝建構函式被禁用。
請看下例:
#include <iostream> // std::cout #include <atomic> // std::atomic, std::atomic_flag, ATOMIC_FLAG_INIT #include <thread> // std::thread, std::this_thread::yield #include <vector> // std::vector // 由 false 初始化一個 std::atomic<bool> 型別的原子變數 std::atomic<bool> ready(false); std::atomic_flag winner = ATOMIC_FLAG_INIT; void do_count1m(int id) { while (!ready) { std::this_thread::yield(); } // 等待 ready 變為 true. for (volatile int i=0; i<1000000; ++i) {} // 計數 if (!winner.test_and_set()) { std::cout << "thread #" << id << " won!\n"; } } int main () { std::vector<std::thread> threads; std::cout << "spawning 10 threads that count to 1 million...\n"; for (int i=1; i<=10; ++i) threads.push_back(std::thread(count1m,i)); ready = true; for (auto& th : threads) th.join(); return 0; }
std::atomic::operator=() 函式
std::atomic 的賦值操作函式定義如下:
set value (1) |
T operator= (T val) noexcept; T operator= (T val) volatile noexcept; |
---|---|
copy [deleted] (2) |
atomic& operator= (const atomic&) = delete; atomic& operator= (const atomic&) volatile = delete; |
可以看出,普通的賦值拷貝操作已經被禁用。但是一個型別為 T 的變數可以賦值給相應的原子型別變數(相當與隱式轉換),該操作是原子的,記憶體序(Memory Order) 預設為順序一致性(std::memory_order_seq_cst),如果需要指定其他的記憶體序,需使用 std::atomic::store()。
#include <iostream> // std::cout #include <atomic> // std::atomic #include <thread> // std::thread, std::this_thread::yield std::atomic <int> foo = 0; void set_foo(int x) { foo = x; // 呼叫 std::atomic::operator=(). } void print_foo() { while (foo == 0) { // wait while foo == 0 std::this_thread::yield(); } std::cout << "foo: " << foo << '\n'; } int main() { std::thread first(print_foo); std::thread second(set_foo, 10); first.join(); second.join(); return 0; }
基本 std::atomic 型別操作
本節主要介紹基本 std::atomic 型別所具備的操作(即成員函式)。我們知道 std::atomic 是模板類,一個模板型別為 T 的原子物件中封裝了一個型別為 T 的值。本文<std::atomic 基本介紹>一節中也提到了 std::atomic 類模板除了基本型別以外,還針對整形和指標型別做了特化。 特化的 std::atomic 型別支援更多的操作,如 fetch_add, fetch_sub, fetch_and 等。本小節介紹基本 std::atomic 型別所具備的操作:
- is_lock_free
bool is_lock_free() const volatile noexcept; bool is_lock_free() const noexcept;
- 判斷該 std::atomic 物件是否具備 lock-free 的特性。如果某個物件滿足 lock-free 特性,在多個執行緒訪問該物件時不會導致執行緒阻塞。(可能使用某種事務記憶體transactional memory 方法實現 lock-free 的特性)。
- store
void store (T val, memory_order sync = memory_order_seq_cst) volatile noexcept; void store (T val, memory_order sync = memory_order_seq_cst) noexcept;
- 修改被封裝的值,std::atomic::store 函式將型別為 T 的引數 val 複製給原子物件所封裝的值。T 是 std::atomic 類模板引數。另外引數 sync 指定記憶體序(Memory Order),可能的取值如下:
Memory Order 值 | Memory Order 型別 |
---|---|
memory_order_relaxed | Relaxed |
memory_order_release | Release |
memory_order_seq_cst | Sequentially consistent |
- 請看下面例子:
#include <iostream> // std::cout #include <atomic> // std::atomic, std::memory_order_relaxed #include <thread> // std::thread std::atomic<int> foo(0); // 全域性的原子物件 foo void set_foo(int x) { foo.store(x, std::memory_order_relaxed); // 設定(store) 原子物件 foo 的值 } void print_foo() { int x; do { x = foo.load(std::memory_order_relaxed); // 讀取(load) 原子物件 foo 的值 } while (x == 0); std::cout << "foo: " << x << '\n'; } int main () { std::thread first(print_foo); // 執行緒 first 列印 foo 的值 std::thread second(set_foo, 10); // 執行緒 second 設定 foo 的值 first.join(); second.join(); return 0; }
- load
T load (memory_order sync = memory_order_seq_cst) const volatile noexcept; T load (memory_order sync = memory_order_seq_cst) const noexcept;
- 讀取被封裝的值,引數 sync 設定記憶體序(Memory Order),可能的取值如下:
Memory Order 值 | Memory Order 型別 |
---|---|
memory_order_relaxed | Relaxed |
memory_order_consume | Consume |
memory_order_acquire | Acquire |
memory_order_seq_cst | Sequentially consistent |
- 請看下面例子:
#include <iostream> // std::cout #include <atomic> // std::atomic, std::memory_order_relaxed #include <thread> // std::thread std::atomic<int> foo(0); // 全域性的原子物件 foo void set_foo(int x) { foo.store(x, std::memory_order_relaxed); // 設定(store) 原子物件 foo 的值 } void print_foo() { int x; do { x = foo.load(std::memory_order_relaxed); // 讀取(load) 原子物件 foo 的值 } while (x == 0); std::cout << "foo: " << x << '\n'; } int main () { std::thread first(print_foo); // 執行緒 first 列印 foo 的值 std::thread second(set_foo, 10); // 執行緒 second 設定 foo 的值 first.join(); second.join(); return 0; }
- operator T
operator T() const volatile noexcept; operator T() const noexcept;
- 與 load 功能類似,也是讀取被封裝的值,operator T() 是型別轉換(type-cast)操作,預設的記憶體序是 std::memory_order_seq_cst,如果需要指定其他的記憶體序,你應該使用 load() 函式。請看下面例子:
#include <iostream> // std::cout #include <atomic> // std::atomic #include <thread> // std::thread, std::this_thread::yield std::atomic<int> foo = 0; std::atomic<int> bar = 0; void set_foo(int x) { foo = x; } void copy_foo_to_bar() { // 如果 foo == 0,則該執行緒 yield, // 在 foo == 0 時, 實際也是隱含了型別轉換操作, // 因此也包含了 operator T() const 的呼叫. while (foo == 0) std::this_thread::yield(); // 實際呼叫了 operator T() const, 將foo 強制轉換成 int 型別, // 然後呼叫 operator=(). bar = static_cast<int>(foo); } void print_bar() { // 如果 bar == 0,則該執行緒 yield, // 在 bar == 0 時, 實際也是隱含了型別轉換操作, // 因此也包含了 operator T() const 的呼叫. while (bar == 0) std::this_thread::yield(); std::cout << "bar: " << bar << '\n'; } int main () { std::thread first(print_bar); std::thread second(set_foo, 10); std::thread third(copy_foo_to_bar); first.join(); second.join(); third.join(); return 0; }
- exchange
T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept; T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept;
- 讀取並修改被封裝的值,exchange 會將 val 指定的值替換掉之前該原子物件封裝的值,並返回之前該原子物件封裝的值,整個過程是原子的(因此exchange 操作也稱為 read-modify-write 操作)。sync引數指定記憶體序(Memory Order),可能的取值如下:
Memory Order 值 | Memory Order 型別 |
---|---|
memory_order_relaxed | Relaxed |
memory_order_consume | Consume |
memory_order_acquire | Acquire |
memory_order_release | Release |
memory_order_acq_rel | Acquire/Release |
memory_order_seq_cst | Sequentially consistent |
請看下面例子,各個執行緒計數至 1M,首先完成計數任務的執行緒列印自己的 ID,
#include <iostream> // std::cout #include <atomic> // std::atomic #include <thread> // std::thread #include <vector> // std::vector std::atomic<bool> ready(false); std::atomic<bool> winner(false); void count1m (int id) { while (!ready) {} // wait for the ready signal for (int i = 0; i < 1000000; ++i) {} // go!, count to 1 million if (!winner.exchange(true)) { std::cout << "thread #" << id << " won!\n"; } }; int main () { std::vector<std::thread> threads; std::cout << "spawning 10 threads that count to 1 million...\n"; for (int i = 1; i <= 10; ++i) threads.push_back(std::thread(count1m,i)); ready = true; for (auto& th : threads) th.join(); return 0; }
- compare_exchange_weak
(1) |
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept; bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept; |
---|---|
(2) |
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) volatile noexcept; bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) noexcept; |
- 比較並交換被封裝的值(weak)與引數 expected 所指定的值是否相等,如果:
- 相等,則用 val 替換原子物件的舊值。
- 不相等,則用原子物件的舊值替換 expected ,因此呼叫該函式之後,如果被該原子物件封裝的值與引數 expected 所指定的值不相等,expected 中的內容就是原子物件的舊值。
- 該函式通常會讀取原子物件封裝的值,如果比較為 true(即原子物件的值等於 expected),則替換原子物件的舊值,但整個操作是原子的,在某個執行緒讀取和修改該原子物件時,另外的執行緒不能對讀取和修改該原子物件。
在第(2)種情況下,記憶體序(Memory Order)的選擇取決於比較操作結果,如果比較結果為 true(即原子物件的值等於 expected),則選擇引數 success 指定的記憶體序,否則選擇引數 failure 所指定的記憶體序。
注意,該函式直接比較原子物件所封裝的值與引數 expected 的物理內容,所以某些情況下,物件的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為物件底層的物理內容中可能存在位對齊或其他邏輯表示相同但是物理表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在物理上兩者的表示並不相同)。
與compare_exchange_strong 不同, weak 版本的 compare-and-exchange 操作允許(spuriously 地)返回 false(即原子物件所封裝的值與引數 expected 的物理內容相同,但卻仍然返回 false),不過在某些需要迴圈操作的演算法下這是可以接受的,並且在一些平臺下 compare_exchange_weak 的效能更好 。如果 compare_exchange_weak 的判斷確實發生了偽失敗(spurious failures)——即使原子物件所封裝的值與引數 expected 的物理內容相同,但判斷操作的結果卻為 false,compare_exchange_weak函式返回 false,並且引數 expected 的值不會改變。
對於某些不需要採用迴圈操作的演算法而言, 通常採用compare_exchange_strong 更好。另外,該函式的記憶體序由 sync 引數指定,可選條件如下:
Memory Order 值 | Memory Order 型別 |
---|---|
memory_order_relaxed | Relaxed |
memory_order_consume | Consume |
memory_order_acquire | Acquire |
memory_order_release | Release |
memory_order_acq_rel | Acquire/Release |
memory_order_seq_cst | Sequentially consistent |
- 請看下面的例子(參考):
#include <iostream> // std::cout #include <atomic> // std::atomic #include <thread> // std::thread #include <vector> // std::vector // a simple global linked list: struct Node { int value; Node* next; }; std::atomic<Node*> list_head(nullptr); void append(int val) { // append an element to the list Node* newNode = new Node{val, list_head}; // next is the same as: list_head = newNode, but in a thread-safe way: while (!list_head.compare_exchange_weak(newNode->next,newNode)) {} // (with newNode->next updated accordingly if some other thread just appended another node) } int main () { // spawn 10 threads to fill the linked list: std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append, i)); for (auto& th : threads) th.join(); // print contents: for (Node* it = list_head; it!=nullptr; it=it->next) std::cout << ' ' << it->value; std::cout << '\n'; // cleanup: Node* it; while (it=list_head) {list_head=it->next; delete it;} return 0; }
- 可能的執行結果如下:
9 8 7 6 5 4 3 2 1 0
- compare_exchange_strong
(1) |
bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept; bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept; |
---|---|
(2) |
bool compare_exchange_strong (T& expected, T val, memory_order success, memory_order failure) volatile noexcept; bool compare_exchange_strong (T& expected, T val, memory_order success, memory_order failure) noexcept; |
- 比較並交換被封裝的值(strong)與引數 expected 所指定的值是否相等,如果:
- 相等,則用 val 替換原子物件的舊值。
- 不相等,則用原子物件的舊值替換 expected ,因此呼叫該函式之後,如果被該原子物件封裝的值與引數 expected 所指定的值不相等,expected 中的內容就是原子物件的舊值。
- 該函式通常會讀取原子物件封裝的值,如果比較為 true(即原子物件的值等於 expected),則替換原子物件的舊值,但整個操作是原子的,在某個執行緒讀取和修改該原子物件時,另外的執行緒不能對讀取和修改該原子物件。
在第(2)種情況下,記憶體序(Memory Order)的選擇取決於比較操作結果,如果比較結果為 true(即原子物件的值等於 expected),則選擇引數 success 指定的記憶體序,否則選擇引數 failure 所指定的記憶體序。
注意,該函式直接比較原子物件所封裝的值與引數 expected 的物理內容,所以某些情況下,物件的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為物件底層的物理內容中可能存在位對齊或其他邏輯表示相同但是物理表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在物理上兩者的表示並不相同)。
- 與compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允許(spuriously 地)返回 false,即原子物件所封裝的值與引數 expected 的物理內容相同,比較操作一定會為 true。不過在某些平臺下,如果演算法本身需要迴圈操作來做檢查, compare_exchange_weak 的效能會更好。
因此對於某些不需要採用迴圈操作的演算法而言, 通常採用compare_exchange_strong 更好。另外,該函式的記憶體序由 sync 引數指定,可選條件如下:
Memory Order 值 | Memory Order 型別 |
---|---|
memory_order_relaxed | Relaxed |
memory_order_consume | Consume |
memory_order_acquire | Acquire |
memory_order_release | Release |
memory_order_acq_rel | Acquire/Release |
memory_order_seq_cst | Sequentially consistent |
- 請看下面的例子:
#include <iostream> // std::cout #include <atomic> // std::atomic #include <thread> // std::thread #include <vector> // std::vector // a simple global linked list: struct Node { int value; Node* next; }; std::atomic<Node*> list_head(nullptr); void append(int val) { // append an element to the list Node* newNode = new Node{val, list_head}; // next is the same as: list_head = newNode, but in a thread-safe way: while (!(list_head.compare_exchange_strong(newNode->next, newNode))); // (with newNode->next updated accordingly if some other thread just appended another node) } int main () { // spawn 10 threads to fill the linked list: std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) threads.push_back(std::thread(append, i)); for (auto& th : threads) th.join(); // print contents: for (Node* it = list_head; it!=nullptr; it=it->next) std::cout << ' ' << it->value; std::cout << '\n'; // cleanup: Node* it; while (it=list_head) {list_head=it->next; delete it;} return 0; }
好了,本文花了大量的篇幅介紹 std::atomic 基本型別,下一篇部落格我會給大家介紹 C++11 的標準庫中std::atomic 針對整形(integral)和指標型別的特化版本做了哪些改進。