1. 程式人生 > >C#異步編程(五)異步的同步構造

C#異步編程(五)異步的同步構造

內容 cto lease amp 並發 異步編程 分析 lis ++

異步的同步構造

  任何使用了內核模式的線程同步構造,我都不是特別喜歡。因為所有這些基元都會阻塞一個線程的運行。創建線程的代價很大。創建了不用,這於情於理說不通。

  創建了reader-writer鎖的情況,如果寫鎖被長時間占有,那麽其他的讀請求線程都會被阻塞,隨著越來越多客戶端請求到達,服務器創建了更多的線程,而他們被創建出來的目的就是讓他們在鎖上停止運行。更糟糕的是,一旦writer鎖釋放,所有讀線程都同時解除阻塞並開始執行。現在,又變成大量的線程試圖在相對數量很少的cpu上運行。所以,windows開始在線程之間不同的進行上下文切換,而真正的工作時間卻很少。

鎖很流行,但長時間擁有會帶來巨大的伸縮性問題。如果代碼能通過異步的同步構造指出他想要一個鎖,那麽會非常有用。在這種情況下,如果線程得不到鎖,可直接返回並執行其他工作,而不必在那裏傻傻地阻塞。

SemaphoreSlim通過waitAsync實現了這個思路

public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken);

使用await asynclock.WaitAsync()就可以實現剛才說的情境。

但如果是reader-writer呢?.net framework提供了concurrentExclusiveSchedulerPair類。實例代碼如下:

private static void ConcurrentExclusiveSchedulerDemo()
{
    
var cesp = new ConcurrentExclusiveSchedulerPair(); var tfExclusive = new TaskFactory(cesp.ExclusiveScheduler); var tfConcurrent = new TaskFactory(cesp.ConcurrentScheduler); for (int i = 0; i < 5; i++) { var exclusive = i < 2; (exclusive ? tfExclusive : tfConcurrent).StartNew(() => { Console.WriteLine(
"{0} access",exclusive?"exclusive":"concurrent"); //這裏進行獨占寫入或者並發讀取操作 }); } }

遺憾的是,framework沒有提供鞠詠reader-writer語義的異步鎖。所以我們可以自己構建一個,如下:

技術分享圖片
public sealed class AsyncOneManyLock
{
    #region 鎖的代碼
    //自旋鎖不要用readonly
    private SpinLock m_lock = new SpinLock(true);

    private void Lock()
    {
        bool taken = false;m_lock.Enter(ref taken);
    }
    private void Unlock()
    {
        m_lock.Exit();
    }

    #endregion

    #region 鎖的狀態和輔助方法

    private Int32 m_state = 0;
    private bool IsFree { get { return m_state == 0; } }
    private bool IsOwnedByWriter { get { return m_state == -1; } }
    private bool IsOwnedByReader { get { return m_state > 0; } }
    private Int32 AddReaders(Int32 count) { return m_state += count; }
    private Int32 SubtractReader() { return --m_state; }
    private void MakeWriter() { m_state = -1; }
    private void MakeFree() { m_state = 0; }

    #endregion

    //目的實在非競態條件時增強性能和減少內存消耗
    private readonly Task m_noContentionAccessGranter;
    //每個等待的writer都通過他們在這裏排隊的TaskCompletionSource來喚醒
    private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters = new Queue<TaskCompletionSource<object>>();
    //一個TaskCompletionSource收到信號,所有等待的reader都喚醒
    private TaskCompletionSource<Object> m_waitingReaderSignal = new TaskCompletionSource<object>();
    private Int32 m_numWaitingReaders = 0;
    public AsyncOneManyLock()
    {
        //創建一個返回null的任務
        m_noContentionAccessGranter = Task.FromResult<Object>(null);
    }
    public Task WaitAsync(OneManyMode mode)
    {
        Task accressGranter = m_noContentionAccessGranter;//假定無競爭
        Lock () ;
        switch (mode)
        {
            case OneManyMode.Exclusive:
                if (IsFree)
                {
                    MakeWriter();//無競爭
                }
                else
                {
                    //有競爭
                    var tcs = new TaskCompletionSource<Object>();
                    m_qWaitingWriters.Enqueue(tcs);
                    accressGranter = tcs.Task;
                }
                break;
            case OneManyMode.Shared:
                if (IsFree||(IsOwnedByReader&&m_qWaitingWriters.Count==0))
                {
                    AddReaders(1);//無競爭
                }
                else
                {
                    //有競爭,遞增等待的reader數量,並返回reader任務使reader等待。
                    m_numWaitingReaders++;
                    accressGranter = m_waitingReaderSignal.Task.ContinueWith(t => t.Result);
                }
                break;
        }
        Unlock();
        return accressGranter;
    }

    public void Release()
    {
        //嘉定沒有代碼被釋放
        TaskCompletionSource<Object> accessGranter = null;
        Lock () ;
        if (IsOwnedByWriter)
        {
            MakeFree();
        }
        else
        {
            SubtractReader();
        }
        if (IsFree)
        {
            //如果自由,喚醒一個等待的writer或所有等待的readers
            if (m_qWaitingWriters.Count>0)
            {
                MakeWriter();
                accessGranter = m_qWaitingWriters.Dequeue();
            }
            else if (m_numWaitingReaders>0)
            {
                AddReaders(m_numWaitingReaders);
                m_numWaitingReaders = 0;
                accessGranter = m_waitingReaderSignal;
                //為將來需要等待的readers創建一個新的tcs
                m_waitingReaderSignal = new TaskCompletionSource<object>();
            }
        }
        Unlock();
        //喚醒鎖外面的writer/reader,減少競爭幾率以提高性能
        if (accessGranter!=null)
        {
            accessGranter.SetResult(null);
        }
    }
}
AsyncOneManyLock

上述代碼永遠不會阻塞線程。原因是內部沒有沒有很實用任何內核構造。這裏確實使用了一個SpinLock,它在內部使用了用戶模式構造。但是他的執行時間很短,WaitAsync方法裏,只是一些整數計算和比較,這也符合只有執行時間很短的代碼段才可以用自旋鎖來保護。所以使用一個spinLock來保護對queue的訪問,還是比較合適的。

並發集合類

FCL自帶4個線程安全的集合類,全部在System.Collections.Concurrent命名空間中定義。它們是ConcurrentStack、concurrentQueue、concurrentDictionary、concurrentBag。

所有這些集合都是“非阻塞”的,換而言之,如果一個線程試圖提取一個不存在的元素(數據項),線程會立即返回;線程不會阻塞在那裏,等著一個元素的出現。正是由於這個原因,所以如果獲取了一個數據項,像tryDequeue,tryPop,tryTake和tryGetValue這樣的方法全部返回true;否則返回false。

一個集合“非阻塞”,並不意味著他就不需要鎖了。concurrentDictionary類在內部使用了Monitor。但是,對集合中的項進行操作時,鎖只被占有極短的時間。concurrentQueue和ConcurrentStack確實不需要鎖;他們兩個在內部都使用interlocked的方法來操縱集合。一個concurrentBag對象由大量迷你集合對象構成,每個線程一個。線程將一個項添加到bag中時,就用interlocked的方法將這個項添加到調用線程的迷你集合中。一個線程視圖從bag中提取一個元素時,bag就檢查調用線程的迷你集合,試圖從中取出數據項。如果數據項在哪裏,就用一個interlocked方法提取這個項。如果不在,就在內部獲取一個monitor,以便從 線程的迷你集合提取一個項。這稱為一個線程從另一個線程“竊取”一個數據項。

註意,所有並發集合類都提供了getEnumerator方法,他一般用於C#的foreach語句,但也可用於Linq。對於concurrentQueue、ConcurrentStack和concurrentBag類,getEnumerator方法獲取集合內容的一個“快照”,並從這個快照中返回元素;實際集合內容可能在使用快照枚舉時發生改變。concurrentDictionary的getEnumerator的該方法不獲取他內容的快照。因此,在枚舉字典期間,字典的內容可能改變。還要註意,count屬性返回的是查詢時集合中的元素數量,如果其他線程同時正在集合中增刪,這個計數可能馬上就變得不正確。

ConcurrentStack、concurrentQueue、concurrentBag都實現了IProducerConsumerCollection接口,實現了這個接口的任何類都能轉變成一個阻塞集合,不過,盡量不使用這種阻塞集合。

這裏我們重點介紹下concurrentDictionary。

ConcurrentDictionary

這裏我對.net core中ConcurrentDictionary源碼進行了分析,裏面采用了Volatile.Read和write,然後也使用了lock這種混合鎖,而且還定義了更細顆粒度的鎖。所以多線程使用ConcurrentDictionary集合還是比較好的選擇。

TryRemove

這個方法會調用內部私用的TryRemoveInternal

技術分享圖片
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
{
    int hashcode = _comparer.GetHashCode(key);
    while (true)
    {
        Tables tables = _tables;
        int bucketNo, lockNo;
        //這裏獲取桶的索引和鎖的索引,註意,鎖的索引和桶未必是同一個值,具體算法看源碼。
        GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
        //這裏鎖住的只是對應這個index指向的鎖,而不是所有鎖。
        lock (tables._locks[lockNo])
        {
            //這裏table可能被重新分配,所以這裏再次獲取,看得到的是不是同一個table
            // If the table just got resized, we may not be holding the right lock, and must retry.
            // This should be a rare occurrence.
            if (tables != _tables)
            {
                continue;
            }

            Node prev = null;
            //這裏同一個桶,可能因為連地址,有很多值,所以要對比key
            for (Node curr = tables._buckets[bucketNo]; curr != null; curr = curr._next)
            {
                Debug.Assert((prev == null && curr == tables._buckets[bucketNo]) || prev._next == curr);
                //對比是不是要刪除的的那個元素
                if (hashcode == curr._hashcode && _comparer.Equals(curr._key, key))
                {
                    if (matchValue)
                    {
                        bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr._value);
                        if (!valuesMatch)
                        {
                            value = default(TValue);
                            return false;
                        }
                    }
                    //執行刪除,判斷有沒有上一個節點。然後修改節點指針或地址。
                    if (prev == null)
                    {
                        Volatile.Write<Node>(ref tables._buckets[bucketNo], curr._next);
                    }
                    else
                    {
                        prev._next = curr._next;
                    }

                    value = curr._value;
                    tables._countPerLock[lockNo]--;
                    return true;
                }
                prev = curr;
            }
        }
        value = default(TValue);
        return false;
    }
}
TryRemoveInternal

TryAdd

這個方法會調用內部私用的TryAddInternal

TryAddInternal(key, _comparer.GetHashCode(key), value, false, true, out dummy);

技術分享圖片
/// <summary>
/// Shared internal implementation for inserts and updates.
/// If key exists, we always return false; and if updateIfExists == true we force update with value;
/// If key doesn‘t exist, we always add value and return true;
/// </summary>
private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
{
    Debug.Assert(_comparer.GetHashCode(key) == hashcode);
    while (true)
    {
        int bucketNo, lockNo;
        Tables tables = _tables;
//老方法了,不多說,獲取hash索引和鎖索引
        GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
        bool resizeDesired = false;
        bool lockTaken = false;
        try
        {
            //這裏都是true的,所以會獲取鎖
            if (acquireLock)
                Monitor.Enter(tables._locks[lockNo], ref lockTaken);

            // If the table just got resized, we may not be holding the right lock, and must retry.
            // This should be a rare occurrence.
            if (tables != _tables)
            {
                continue;
            }

            // Try to find this key in the bucket
            Node prev = null;
            //查看對應的桶裏,
            for (Node node = tables._buckets[bucketNo]; node != null; node = node._next)
            {
                Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node);
                //查看有沒有相同的key值,有就返回false
                if (hashcode == node._hashcode && _comparer.Equals(node._key, key))
                {
                    // The key was found in the dictionary. If updates are allowed, update the value for that key.
                    // We need to create a new node for the update, in order to support TValue types that cannot
                    // be written atomically, since lock-free reads may be happening concurrently.
                    //這個應該是addorupdate使用的,存在就更新。
                    if (updateIfExists)
                    {
                        if (s_isValueWriteAtomic)
                        {
                            node._value = value;
                        }
                        else
                        {
                            Node newNode = new Node(node._key, value, hashcode, node._next);
                            if (prev == null)
                            {
                                Volatile.Write(ref tables._buckets[bucketNo], newNode);
                            }
                            else
                            {
                                prev._next = newNode;
                            }
                        }
                        resultingValue = value;
                    }
                    else
                    {
                        resultingValue = node._value;
                    }
                    return false;
                }
                prev = node;
            }
            
            // The key was not found in the bucket. Insert the key-value pair.
            Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo]));
            //這裏checked檢查是否存在溢出。
            checked
            {
                tables._countPerLock[lockNo]++;
            }
            // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table.
            // It is also possible that GrowTable will increase the budget but won‘t resize the bucket table.
            // That happens if the bucket table is found to be poorly utilized due to a bad hash function.
            // _budget是 The maximum number of elements per lock before a resize operation is triggered
            if (tables._countPerLock[lockNo] > _budget)
            {
                resizeDesired = true;
            }
        }
        finally
        {
            if (lockTaken)
                Monitor.Exit(tables._locks[lockNo]);
        }
        // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table.
        //
        // Concurrency notes:
        // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks.
        //As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0

        //   and then verify that the table we passed to it as the argument is still the current table.
        if (resizeDesired)
        {
            GrowTable(tables);
        }
//賦值
        resultingValue = value;
        return true;
    }
}
TryAddInternal

TryGetValue

TryGetValueInternal(key, _comparer.GetHashCode(key), out value);

private bool TryGetValueInternal(TKey key, int hashcode, out TValue value)
{
    Debug.Assert(_comparer.GetHashCode(key) == hashcode);
    //用本地變量保存這個table的快照。
    // We must capture the _buckets field in a local variable. It is set to a new table on each table resize.
    Tables tables = _tables;
    int bucketNo = GetBucket(hashcode, tables._buckets.Length);
    // We can get away w/out a lock here.
    // The Volatile.Read ensures that we have a copy of the reference to tables._buckets[bucketNo].
    // This protects us from reading fields (‘_hashcode‘, ‘_key‘, ‘_value‘ and ‘_next‘) of different instances.
Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]);
//如果key相符 ,賦值,不然繼續尋找下一個。
    while (n != null)
    {
        if (hashcode == n._hashcode && _comparer.Equals(n._key, key))
        {
            value = n._value;
            return true;
        }
        n = n._next;
    }
    value = default(TValue);//沒找到就賦默認值
    return false;
}

C#異步編程(五)異步的同步構造