1. 程式人生 > >C#多線程編程(6)--線程安全2 互鎖構造Interlocked

C#多線程編程(6)--線程安全2 互鎖構造Interlocked

推薦 class ++ 需要 ons 簡單的 版本 com eva

在線程安全1中,我介紹了線程同步的意義和一種實現線程同步的方法:volatile。volatile關鍵字屬於原子操作的一種,若對一個關鍵字使用volatile,很多時候會顯得很“浪費”,因為只有在並發訪問的情況下才需要“易變”讀寫,單線程訪問時並不需要。在命名空間System.Threading命名空間中提供了InterLock類,該類中提供了一些原子方法。本文來介紹如何使用這些方法。

  •   互鎖

  在搶占式系統中,一個線程在執行到任何階段都有可能被其他線程“打斷”,原子操作能夠保證該操作不被其他線程打斷,其他線程的“打斷”只可能發生在該操作的之前或之後。InterLock類中的方法就是”原子“式的,最常用的方法有:

public static class InterLock{
    // return (++location)
    public static int Increment(ref int location);
    // return(--location)
    public static int Decrement(ref int location);
    // return(location += value)
    //註意,也可能是負數,從而實現減法運算。
    public static int Add(ref int location, int value)
    // int old = location; location = value; return old;
public static int Exchange(ref int location, int value); //old = location1; //if(location1 = comparand) location1 = value; //return old; public static int CompareExchange(ref int location1, int value, int comparand); ... }

  《CLR via C#》的作者在書中說他喜歡Interlocked中的方法,因為它不但很快,而且不會阻塞線程。為了驗證InterLock真的很快,我們對變量進行一百萬次的寫,與Volatile.Write()來進行對比,看看是否真的快。

 static void Main(string[] args){
     TestInterLock();
     Console.ReadLine();
 }
 private static volatile int v = 0;
 private static int n = 0;
 static void TestInterLock(){
     Stopwatch sw = Stopwatch.StartNew();
     for (int i = 0; i < 1000000; i++)
         v++;
     Console.WriteLine("volatile write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
     sw = Stopwatch.StartNew();
     for (int i = 0; i < 1000000; i++)
         Interlocked.Increment(ref n);
     Console.WriteLine("InterLock write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
     n = 0;
     sw = Stopwatch.StartNew();
     for (int i = 0; i < 1000000; i++)
         n++;
     Console.WriteLine("n++ write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
 }

  運行結果如下:

volatile write 1000000 times takes:2

InterLock write 1000000 times takes:8

n++ write 1000000 times takes:2

  我運行了好幾次,結果會有些出入,但是大部分的結果都是volatile的寫入速度和原生的n++的速度是一樣的。InterLock也確實如Jeffrey Richter所說,很快,只是沒有volatile關鍵字修飾的變量的讀寫快。這是在非並發情況下,下面來看一下並發情況下是否還是很快。

static void TestInterLock1(){
    Stopwatch sw = Stopwatch.StartNew();
    Task[] t2 = new[] { new Task(() =>    {
        for (int i = 0; i < 1000000; i++)
            Interlocked.Increment(ref n);
    }), new Task(() => { 
        for (int i = 0; i < 1000000; i++)
            Interlocked.Increment(ref n);}) };
    Task[] t1 = new[] { new Task(() =>    {
        for (int i = 0; i < 1000000; i++)
            v++;
    }), new Task(() => { 
        for (int i = 0; i < 1000000; i++)
            v++;}) };
    Task.WhenAll(t1)
        .ContinueWith(t => Console.WriteLine("volatile write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
    Task.WhenAll(t2)
        .ContinueWith(t => Console.WriteLine("InterLock write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
    t2[0].Start();
    t1[0].Start();
    t1[1].Start();
    t2[1].Start();
}

  先看運行結果,

volatile write 2000000 times takes:94
InterLock write 2000000 times takes:101

  多次運行,運行時間會有不同,但是在並發情況下,volatile的寫入和InterLock的寫入速度幾乎相同。上述代碼寫的如此醜陋,而不是直接寫Task.Run(),是為了保證初始化部分都運行完成後,再Start(),且兩個任務的先後順序進行了打亂,最大限度減少誤差。可以看到並發情況下,volatile和InterLock幾乎一樣,且在InterLock中的方法要比Volatile的功能要全,但是在串行時,Volatile的性能要比InterLock要好。結論是,若只對變量讀寫,沒有替換或者其他復雜操作時,可以使用volatile關鍵字,但是一些復雜操作,需要原子操作時,就得使用InterLock中的方法了,如果使用volatile關鍵字修飾的變量來進行交換的話,很難保證原子性,只有引入鎖才能保證線程同步。且InterLock中提供了幾個重載方法,能夠接受object類型,還有泛型版本。

可以利用InterLock來實現一個簡單的自旋鎖,代碼如下:

public struct SimpleSpinLock{
    private int m_Lock;

    public void Enter(){
        while (true){
            if (Interlocked.Exchange(ref m_Lock, 1) == 0) return;
            //此處可以添加黑科技
        }
    }

    public void Leave(){
        Volatile.Write(ref m_Lock, 0);
    }
}

//下面是如何使用SimpleSpinLock的例子
public class Simple{
    private SimpleSpinLock m_lock = new SimpleSpinLock();

    public void AccessResource(){
        m_lock.Enter();
        //執行某些程序,只有一個線程可以進入這裏
        m_lock.Leave();
    }
}

  這個簡單的自旋鎖在一個線程調用Enter()時,其他線程在調用m_lock.Enter()方法時,if (Interlocked.Exchange(ref m_lock, 1) == 0)會失敗,因為該方法會將m_lock和1交換,並返回舊值,在已有線程調用m_lock.Enter()時,m_lock的舊值是1,因此該方法會在whle(true)處自旋,不斷嘗試獲得鎖。該鎖的問題是,該線程沒有被阻塞(掛起),而是一直在占用CPU資源,其他需要CPU資源的線程無法運行(可以在while內,我加註釋的地方,加入”黑科技“來嘗試解決此問題。其思路是在線程自旋的過程中,立刻交出CPU資源,可通過調用Thread.Sleep(0)或者Thread.Yield()來實現。嘗試獲得鎖的線程交出時間片,這樣當前獲得鎖的線程能夠有更多的資源來運行程序,從而運行結束並交出鎖,具體細節在這裏不展開)。因此,自旋鎖只適合那些運行非常快的方法。

  • Interlocked Anything 模式

  Interlocked中全部是原子性操作,那是否提供了一個方法,該方法可以接受一個委托,保證該委托在運行時是原子的。答案是沒有,但是可以利用Interlocked.CompareExchange來自己實現一個。我們先來看一下利用CompareExchange來實現原子性的Maximum方法。

public static int Maximum(ref int target, int value){
    int currentValue = target, startValue, desireValue;
    do{
        startValue = currentValue;
        //可以在此處添加任何想要保證“原子性”的操作,此處是求最大值。
        desireValue = Math.Max(startValue, value);
        //註意,此處有可能被其他線程搶占,也有可能target的值被修改,因此if()語句會出問題,要使用Interlocked的方法。
        //if (startValue = target) target = desireValue;
        currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
    } 
    //若在此時target已被修改,則重新計算最大值
    while (startValue != currentValue);
    return currentValue;
}

  此方法的思路是:從CPU將target的值讀到寄存器中,到計算最大值結束,期間的任何時間target都有可能被其他線程修改。因此保證原子性就被轉換成保證計算最大值時,target的值沒有變過,如果變過,就重新計算。因此,在最開始的時候,startValue=currentValue,currentValue是開始計算時target的值。然後求得最大值,並保存到desireValue中。註意,此時target有可能被修改,因此調用CompareExchange方法,該方法會將target與startValue比較,如果此時兩值相等,那相當於我之前說的,target從開始到最後沒有改變,那麽這個最大值是準確的,並將target的舊值付給currentValue,最後如果startValue==currentValue,則計算完成,否則繼續循環。

  《CLR via C#》的作者Jeff很喜歡上面的方法,他在實際開發中,都是使用上面的方法,並對其進行了包裝,使之能夠支持Interlocked Anything。其原理就是在desireValue=Math.Max()處替換成其他方法,只要在返回結果時保證舊值和最開始讀取的值一致就可以。我們來看一下他的封裝:

delegate int Morpher<TResult, in TArgument>(int startValue, TArgument argument, out TResult result);

static TResult Morph<TResult, TArgumen>(ref int target, TArgumen argumen, Morpher<TResult, TArgumen> morpher){
    TResult mophorResult;
    int currentValue = target, desireValue, startValue;
    do{
        startValue = currentValue;
        desireValue = morpher(startValue, argumen, out mophorResult);
        currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
    } 
    while (currentValue != startValue);
    return mophorResult;
}

說實話,我並不能非常好了理解這個封裝,並不是不能理解做法,而是不能確定此方法到底能不能實現效果,那我們來測試一下。測試的基本思路是對一個變量執行1000次的result+=10。分別是不帶線程同步的和利用Morph方法對result+=10的方法進行互鎖,保證其原子性。我省去了Morpher和Morph的聲明部分。之所以要在DelayAdd和DelayAdd1方法中調用ThreadSleep(20),是為了模擬當在運行較長的方法時,Morph方法是否還能夠保證該方法運行的原子性。

static void Main(string[] args){
    Test(new TestAction(Add), "Add");
    Test(new TestAction(MorphAdd), "MorphAdd");
    Console.ReadLine();
}
//DelayAdd1的變種,使之能夠符合Morpher的簽名
static int DelayAdd(int startValue, int argument, out int result){
    Thread.Sleep(20);
    result = startValue + argument;
    return result;
}

static void DelayAdd1(int argument, ref int result){
    Thread.Sleep(20);
    result += argument;
}
//測試的不具有線程同步的方法。
static void Add(ref int result){
    DelayAdd1(10, ref result);
}
//具有線程同步的方法。
static void MorphAdd(ref int result){
    Morph(ref result, 10, new Morpher<int, int>(DelayAdd));
}
//要測試的委托簽名
delegate void TestAction(ref int result);
//公共測試方法
static void Test(TestAction action, string actionName){
    int result = 0;
    var tList = new Task[1000];
    for (int i = 0; i < 1000; i++)
        tList[i] = Task.Run(() =>
        {
            action(ref result);
        });
    Task.WhenAll(tList).GetAwaiter().OnCompleted(
        () => Console.WriteLine("{0}, Result is {1}", actionName, result));
}

運行,得到的結果是:

Add, Result is 8440
MorphAdd, Result is 10000

運行1000次result += 10,普通的Add不能夠得到正確結果,但是MorphAdd可以。這是因為在1000次的Add中,某幾個Add是同時調用的,result+=10在同一時間調用了多次,因此有156次的Add因為並行而被吞噬了。值得註意的是,MorphAdd方法因為需要線程同步,因此執行時間要慢很多。但是這些付出是值得的,因為這保證了結果的正確。

  上述例子證明了Morph方法能夠保證委托的原子性,且該方法既不會阻塞線程也不會長時間的自旋,推薦大家在實際中使用該方法。

  本文中,我先介紹了Interlocked類中較常用的方法,以及Interlocked.Increment()方法與volatile關鍵字的對比,結論是雖然將變量設置為所有讀寫都是“易變的”看起來很浪費,但是該關鍵字能夠保證在單線程時幾乎沒有性能損失,大部分情況下和原生的讀寫是一樣的速度,且volatile比Interlocked類中提供的寫要快2-4倍,但是在並發狀態下其性能和volatile關鍵字是沒有差別的。之後我介紹了用Interlocked類中的方法來實現簡單的自旋鎖,該鎖的優點是在非竟態情況下非常快,但是在竟態情況下,未獲得鎖的線程會一直處於自旋狀態,白白浪費CPU。最後介紹了《CLR via C#》書中提到的Interlocked Anything的方法(文中其他的知識點大多也是提取自《CLR via C#》),並測試了該方法確實可以保證委托的原子性,且不會阻塞線程,沒有鎖,不會造成死鎖。至此線程同步中互鎖構造就講完了,後面我會給大家介紹內核構造的信號量和其他鎖。

C#多線程編程(6)--線程安全2 互鎖構造Interlocked