1. 程式人生 > >漫談C++11 Thread庫之原子操作

漫談C++11 Thread庫之原子操作

      我在之前一篇博文《漫談C++11 Thread庫之使寫多執行緒程式中,著重介紹了<thread>標頭檔案中的std::thread類以及其上的一些基本操作,至此我們動手寫多執行緒程式已經基本沒有問題了。但是,單執行緒的那些"坑"我們仍還不知道怎麼去避免。

多執行緒存在的問題

      多執行緒最主要的問題就是共享資料帶來的問題。如果共享資料都是隻讀的,那麼沒問題,因為只讀操作不會影響到資料,更不會涉及對資料的修改,所以所有執行緒都會獲得同樣的資料。但是,當一個或多個執行緒要修改共享資料時,就會產生很多潛在的麻煩。

#include <iostream>
#include <thread>

long sum = 0L;

void fun()
{
    for(int i=1;i<100000;++i)
        sum += i;
}

int main()
{
    std::cout << "Before joining,sun = " << sum << std::endl;
    std::thread t1(fun);
    std::thread t2(fun);
    t1.join();
    t2.join();
    std::cout << "After joining,sun = " << sum << std::endl;
}

  程式結構很簡單,啟動兩個執行緒分別對變數sum加上 1-99999。其記憶體結構大致上是這樣的。

c++多執行緒程式中,每個執行緒都有一個執行緒棧,它們相互獨立,因此線上程棧中的資料,是不會被其他執行緒影響到的。但是在記憶體的資料段中的資料,是可以在全域性被訪問到的。我們在上面這段程式碼中定義的sum變數正是位於資料段中。

      在目前來看,我們期望最後程式退出的時候,打印出sum是 9999900000。但是結果卻不盡人意,我們試著編譯執行:

[thread]g++ condition.cpp -omain -std=c++11 -lpthread
[thread]main
Before joining,sun = 0
After joining,sun = 5192258282
[thread]main
Before joining,sun = 0
After joining,sun = 8418413412
[thread]main
Before joining,sun = 0
After joining,sun = 5294478585

  顯然結果還是比較意外的,運行了三次,都得到了不同的結果,而且沒有一次得到我們的期望值,這下我們精準地踩中了多執行緒的"坑"。試著多執行幾遍,看看會不會出現正確的結果。當然手動執行幾遍甚至幾十遍,還是可以應付得了的。但是要執行幾千遍,手動執行下來估計手就得抽筋了。這樣的機械般的操作還是交給shell指令碼吧,由於我的機器配置不是很牛×,暫且先1000次看看,shell指令碼如下,count.sh:

#!/bin/bash
#result equal with 9999900000
cnt=0
#result more than 9999900000
cnt_more=0 
#result less than 9999900000
cnt_less=0 
for((i=0;i<1000;++i))
do
    var=$(main|tail -1)
    var=${var#After joining,sun = }
    if(($var == 9999900000))
    then
        ((cnt++))
    fi
    if(($var > 9999900000))
    then
        ((cnt_more++))
    fi
    if(($var < 9999900000))
    then
        ((cnt_less++))
    fi
done

echo "cnt="$cnt
echo "cnt_more="$cnt_more
echo "cnt_less="$cnt_less

  其中變數cnt來統計1000次執行中總共得到過多少次的正確結果,用cnt_more統計偏大的結果,用cnt_less統計偏小的結果。這是該指令碼的執行結果:

[thread]count.sh
cnt=315
cnt_more=0
cnt_less=685

  1000次執行中還是有315次得到了正確答案,有685次的結果是偏小的,卻沒有一次的結果是偏大的!那麼問題出在哪裡了?試著想象一下這樣一個場景:你和朋友合租在一間房子裡邊,房子裡面只有一間廚房,你們共用一個鍋。有一天你準備做一道西紅柿炒蛋,當你把西紅柿放入鍋中的時候,你的電話響了,你離開廚房去接電話。而這時候你的室友也要做飯,他要做一道紅燒魚,於是他把洗好的魚放入了鍋中煮,然後也離開了廚房(由於某種原因他不知道鍋裡還有你的食材,在程式中執行緒也不會知道其他執行緒對共享的資料做了什麼)。當你回來的時候繼續往裡邊放入雞蛋,最後你得到的是一盤西紅柿炒雞蛋魚。而你的室友回來廚房的時候他要的紅燒魚就會不見了。

      在上面的例子裡,你和室友就代表著thread1和thread2執行緒,sum變數就是那個鍋。多執行緒中共享資料的問題,就是上面場景中你們共用一口鍋造成的問題。

原子操作

      要解決上面場景的問題,其中有一中可行的方案就是:你們做菜的步驟很短,短到什麼程度呢,短到這個步驟不可被分割。例如你做的這道菜只有一個步驟,就是讓食材(對應於下面提到的原子資料型別)碰一下鍋(當然現實場景中基本沒有這樣的菜),這樣你們的做菜過程就不會被其他室友打斷、干擾,即使你們共同在使用一口鍋。

      而上面的程式碼中的 sum += i 在CPU指令的層面上是可以被分割的,我用g++的-S選項生成其彙編的指令看到了一段這樣的程式碼:

movl	$0, -4(%ebp)     // sum = 0
movl	$0, -8(%ebp)     // i =0
......
movl	-8(%ebp), %eax   //將i送入暫存器eax
addl	%eax, -4(%ebp)   //將i的值加上sum的值,將結果儲存到 sum中。
movl	$0, %eax

  彙編指令還是描述的比較清楚的,可以清楚的看到 sum += i;操作被分割成了兩條cpu指令,先是將i的值儲存在eax暫存器中,然後將eax的值加上sum的值並儲存在sum中。

      而在c++中原子操作就是這樣的一種『小到不可分割的』操作。要使用原子操作我們需要引用c++11的一個新的標頭檔案<atomic>。在這個標頭檔案中定義了一個類模板struct atomic表示原子資料型別,在GNU的實現(/usr/include/c++/4.8.3/atomic)上如下:

template<typename _Tp>
struct atomic
{
private:
   _Tp _M_i;
public:
    atomic() noexcept = default;
    ~atomic() noexcept = default;
    atomic(const atomic&) = delete;                //刪除了拷貝構造
    atomic& operator=(const atomic&) = delete;        
    atomic& operator=(const atomic&) volatile = delete;   //刪除了 operator=
    constexpr atomic(_Tp __i) noexcept : _M_i(__i) { }
    operator _Tp() const noexcept
    { 
    	  return load();
    }

    operator _Tp() const volatile noexcept
    { 
    	  return load();
    }

    _Tp operator=(_Tp __i) noexcept 
    { 
    	store(__i);
    	return __i;
    }

    ...
};

  atomic模板中還實現了操作符的過載(由於篇幅,檢視完整的類結構請參閱atomic標頭檔案),因此你可以像使用內建的資料型別那樣使用原子資料型別(c++保證這些操作是原子操作)。對應於內建的資料型別,原子資料型別都有一份對應的型別,歸納出來如下:

std::atomic_char std::atomic<char>
std::atomic_schar std::atomic<signed char>
std::atomic_uchar std::atomic<unsigned char>
std::atomic_short std::atomic<short>
std::atomic_ushort std::atomic<unsigned short>
std::atomic_int std::atomic<int>
std::atomic_uint std::atomic<unsigned int>
std::atomic_long std::atomic<long>
std::atomic_ulong std::atomic<unsigned long>
std::atomic_llong std::atomic<long long>
std::atomic_ullong std::atomic<unsigned long long>

      我們之前的sum變數是long型別的,對應的原子資料型別是std::atomic_long,下面我們就簡單的修改一下開篇的程式碼:

#include <iostream>
#include <thread>
#include <atomic>                 // modified

std::atomic_long sum = {0L};    //  modified

void fun()
{
    for(int i=0;i<100000;++i)
        sum += i;
}

int main()
{
    std::cout << "Before joining,sun = " << sum << std::endl;
    std::thread t1(fun);
    std::thread t2(fun);
    t1.join();
    t2.join();
    std::cout << "After joining,sun = " << sum << std::endl;
}

  我們只增加了一個<atomic>標頭檔案,並且將 long sum = 0L; 修改成了 std::atomic_long sum {0L}; 注意不要寫成『std::atomic_long sum = 0L』的形式,因為long型別是不可以隱式轉換為std::atomic_long型別的。

      為了證明不是偶然性,我們仍用上面的count.sh這個指令碼執行1000次上面的修改過的程式:

[thread]g++ atomic.cpp -o main -std=c++11 -lpthread
[thread]count.sh
cnt=1000
cnt_more=0
cnt_less=0

可以看到原子操作還是有明顯的效果的,這1000次的執行我們都得到了正確的結果。事實證明原子操作的確可以作為解決共享資料引起的問題的一種有效的手段。

"自旋鎖"——atomic_flag

      和其他的原子資料型別(包括atomic_bool)不同的是,他是鎖無關(lock-free)的一種型別,即執行緒對它的訪問是不需要加鎖的,因此他也沒有其他的原子型別的讀寫操作(,store())、運算子操作等。取而代之的是另外兩個原子操作的函式test_and_set()clear()。atomic_flag類的結構在GNU上是這樣的:

#if __GCC_ATOMIC_TEST_AND_SET_TRUEVAL == 1
    typedef bool __atomic_flag_data_type;
#else
    typedef unsigned char __atomic_flag_data_type;
#endif

struct __atomic_flag_base
{
    __atomic_flag_data_type _M_i;
};
  
struct atomic_flag : public __atomic_flag_base
{ 
...
bool test_and_set(memory_order __m = memory_order_seq_cst) noexcept; bool test_and_set(memory_order __m = memory_order_seq_cst) volatile noexcept; void clear(memory_order __m = memory_order_seq_cst) noexcept; void clear(memory_order __m = memory_order_seq_cst) volatile noexcept;
... private: static constexpr __atomic_flag_data_type _S_init(bool __i) { return __i ? __GCC_ATOMIC_TEST_AND_SET_TRUEVAL : 0; } };

      atomic_flag::test_and_set()和其名字一樣,大致上是這樣工作的:首先檢查這atomic_flag類中的bool成員_M_i是否被設定成true,如果沒有就先設定成true,並返回之前的值(flase),如果atomic_flag中的bool成員已經是true,則直接返回true

      相比較而言atomic_flag::clear()更加簡單粗暴,它直接將atomic_flag的bool值得標誌成員_M_i設定成flase,沒有返回值

      既然小標題是『自旋鎖——atomic_flag』,那麼我們看看這把自旋鎖(spin lock)是怎麼用的:

#include <iostream>
#include <atomic>
#include <unistd.h>
#include <thread>

std::atomic_flag lock = ATOMIC_FLAG_INIT;                               //初始化

void f(int n)
{
    while(lock.test_and_set())                                          //獲取鎖的狀態
        std::cout << "Waiting ... " << std::endl;
    std::cout << "Thread " << n << " is starting working." << std::endl;
}

void g(int n)
{
    sleep(3);
    std::cout << "Thread " << n << " is going to clear the flag." << std::endl;
    lock.clear();                                                       // 解鎖
}

int main()
{
    lock.test_and_set();
    std::thread t1(f,1);
    std::thread t2(g,2);

    t1.join();
    t2.join();
}

  進入main函式後我們就先設定好atomic_flag,然後啟動了兩個執行緒t1和t2,其中t1中我們一直迴圈獲取atomic_flag的狀態,知道t2睡眠3秒後,clear()掉lock的鎖定狀態。其執行結果:

[thread]g++ atomic_flag.cpp -o main -std=c++11 -lpthread
[thread]main
Waiting ... 
Waiting ... 
Waiting ... 
Waiting ... 
Waiting ... 
// omit lager of "waiting..." 
thread 2 is going to clear the flag.
Thread 1 is starting working.

這樣的結果正合我們的期望,實際上我們就是通過自旋鎖實現了讓t1執行緒一直在等待t2執行緒。

      更進一步地我們還可以通過簡單的封裝,來實現一把鎖。MyLock.h(為了直觀我就都寫到一個檔案中了):

#ifndef __MYLOCK_H_
#define __MYLOCK_H_
#include <iostream>
#include <atomic>
#include <thread>

class MyLock
{
private:
    std::atomic_flag m_flag;
public:
    MyLock();
    void lock();
    void unlock();
};

MyLock::MyLock()
{
    m_flag.clear();                    //if not do this,m_flag will be unspecified
}

void MyLock::lock()
{
    while(m_flag.test_and_set())
        ;
}

void MyLock::unlock()
{
    m_flag.clear();
}
#endif

  現在我們就試著使用這把鎖,來改寫開篇的那個程式:

#include <iostream>
#include <thread>
#include "MyLock.h"    //code above 

MyLock lk;

long sum = 0;

void add()
{
    for(int i=0;i<100000;++i)
    {
        lk.lock();
        sum += i;
        lk.unlock();
    }
}

int main()
{
    std::thread t1(add);
    std::thread t2(add);

    t1.join();
    t2.join();

    std::cout << "sum = " << sum << std::endl;
}

  執行後沒有問題,正確打印出結果sum=9999900000。

記憶體順序語義

bool test_and_set(std::memory_order order = std::memory_order_seq_cst) volatile;
bool test_and_set(std::memory_order order = std::memory_order_seq_cst);
void clear( std::memory_order order = std::memory_order_seq_cst ) volatile;
void clear( std::memory_order order = std::memory_order_seq_cst );

這兩個函式原型包含了一個新的資料型別std::memory_order,這是一個列舉型別,其具體的定義在<bits/atomic_base.h>標頭檔案中(/usr/include/c++/4.8.3/bits/atomic_base.h)。所有的列舉值得具體意義,我都查閱資料,註釋在後邊,如下:

typedef enum memory_order
    {
      memory_order_relaxed,   //不對執行順序做任何保證
      memory_order_consume,    //本執行緒中所有有關本原子型別的操作,必須等到本條原子操作完成之後進行
      memory_order_acquire,    //本執行緒中,後續的讀操作必須在本條原子操作完成後進行
      memory_order_release,    // 本執行緒中,之前的寫操作完成後才執行本條原子操作
      memory_order_acq_rel,    //memory_order_acquire和memory_order_release 效果的合併
      memory_order_seq_cst     //順序一致
} memory_order;

test_and_set()和clear()的預設引數都是使用的memory_order_seq_cst這個列舉值,其語義上是順序一致性(sequential consistent)。順序一致性是指執行緒執行的順序和我們程式設計師所寫程式碼的順序是一致的。我們首次接觸這個概念的時候,可能會感到疑惑,一直以來我們都理所當然的以為我們寫的是什麼,程式就怎麼幹。其實不然。當編譯器在編譯我們的原始碼的時候會權衡我們的程式碼做出適當的優化,如果編譯器認為執行順序和程式輸出結果無直接影響,那麼就可能會重排序(reorder)指令以提高效能。而memory_order_seq_cst則保證了順序執行程式。如上邊memory_order定義的那樣,在C++11,並不是只支援順序一致的記憶體模型,因為順序一致意味著最低效。

      關於記憶體順序個人以為這和硬體的關係跟大一些,在此不再用過多篇幅討論。瞭解一下應該就夠了。

      最後謝謝你的閱讀,如果你能給我一點建議的話,那就更好了。