1. 程式人生 > >一起來看CORE源碼(一) ConcurrentDictionary

一起來看CORE源碼(一) ConcurrentDictionary

represent repl hashcode 識別 await condition 私有變量 repr call

先貼源碼地址

https://github.com/dotnet/corefx/blob/master/src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs

.NET CORE很大一個好處就是代碼的開源,你可以詳細的查看你使用類的源代碼,並學習微軟的寫法和實現思路。

  這裏我對.net core中ConcurrentDictionary源碼進行了分析,裏面采用了Volatile.Read和write(volatile作用:確保本條指令不會因編譯器的優化而省略,且要求每次直接從內存地址讀值,而不走寄存器),然後也使用了lock這種混合鎖,而且還定義了更細顆粒度的鎖。所以多線程使用ConcurrentDictionary集合還是比較好的選擇。

本來想把本篇放到我的《C#異步編程系列》,不過後來感覺那個系列寫的已經算是收尾了,而且以後還會有寫更多core源碼分析的文字,所以就單獨新增一個系列把。

ConcurrentDictionary內部私有類

先上源碼,再仔細聊

/// <summary>
/// Tables that hold the internal state of the ConcurrentDictionary
///
/// Wrapping the three tables in a single object allows us to atomically
/// replace all tables at once.
/// </summary> private sealed class Tables { // A singly-linked list for each bucket. // 單鏈表數據結構的桶,裏面的節點就是對應字典值 internal readonly Node[] _buckets; // A set of locks, each guarding a section of the table. //鎖的數組 internal readonly object[] _locks; // The number of elements guarded by each lock.
internal volatile int[] _countPerLock; internal Tables(Node[] buckets, object[] locks, int[] countPerLock) { _buckets = buckets; _locks = locks; _countPerLock = countPerLock; } } /// <summary> /// A node in a singly-linked list representing a particular hash table bucket. /// 由Dictionary裏的Entry改成Node,並且把next放到Node裏 /// </summary> private sealed class Node { internal readonly TKey _key; internal TValue _value; internal volatile Node _next; internal readonly int _hashcode; internal Node(TKey key, TValue value, int hashcode, Node next) { _key = key; _value = value; _next = next; _hashcode = hashcode; } } private volatile Tables _tables; // Internal tables of the dictionary private IEqualityComparer<TKey> _comparer; // Key equality comparer // The maximum number of elements per lock before a resize operation is triggered // 每個鎖對應的元素最大個數,如果超過,要重新進行resize tables private int _budget;

  首先,內部類定義為私有且密封,這樣就保證了無法從外部進行篡改,而且註意volatile關鍵字的使用,這確保了我們多線程操作的時候,最終都是去內存中讀取對應地址的值和操作對應地址的值。

internal readonly object[] _locks;
internal volatile int[] _countPerLock;

以上兩個類是為了高性能及並發鎖所建立的對象,實際方法上鎖時,使用如下語句

lock (tables._locks[lockNo])
Monitor.Enter(tables._locks[lockNo], ref lockTaken);

  以上兩種調用方式是等價的,都會阻塞執行,直到獲取到鎖(對於Monitor我很多時候會盡可能使用TryEnter,畢竟不阻塞,不過這個類的實現一定要使用阻塞式的,這樣程序邏輯才能繼續往下走。更多關於Monitor我在 《C#異步編程(四)混合模式線程同步》裏面有詳細介紹)

這樣,實現了顆粒化到每個單獨的鍵值的鎖,最大限度的保證了並發。

這裏lockNo參數是通過GetBucketAndLockNo方法獲取的,方法通過out變量返回值。

/// <summary>
/// Computes the bucket and lock number for a particular key.
///這裏獲取桶的索引和鎖的索引,註意,鎖的索引和桶未必是同一個值。 
/// </summary>
private static void GetBucketAndLockNo(int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
{
    bucketNo = (hashcode & 0x7fffffff) % bucketCount;
    lockNo = bucketNo % lockCount;
}

上面方法中

hashcode 是通過private IEqualityComparer<TKey> _comparer對象的GetHashCode方法通過key獲取到的。

bucketCount是整個table的長度。

lockCount是現有的鎖的數組

TryAdd方法

  我們從最簡單的TryAdd方法開始介紹,這裏ConcurrentDictionary類的封裝非常合理,暴露出來的方法,很多是通過統一的內部方法進行執行,比如更新刪除等操作等,都有類內部唯一的私有方法進行執行,然後通過向外暴漏各種參數不同的方法,來實現不同行為。

public bool TryAdd(TKey key, TValue value)
{
    if (key == null) ThrowKeyNullException();
    TValue dummy;
    return TryAddInternal(key, _comparer.GetHashCode(key), value, false, true, out dummy);
}

上面TryAddInternal的參數對應如下

/// <summary>
/// Shared internal implementation for inserts and updates.
/// If key exists, we always return false; and if updateIfExists == true we force update with value;
/// If key doesn‘t exist, we always add value and return true;
/// </summary>
private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)

也就說說,updateIfExists為false,存在值的情況下,TryAdd不會更新原有值,而是直接返回false。我的多線程並發寫庫就是利用了這個特性,這個案例我會在本文最後介紹。現在我們來看TryAddInternal內部,廢話不多說,上源碼(大部分都註釋過了,所以直接閱讀即可)

//while包在外面,為了continue,如果發生了_tables私有變量在操作過程被其他線程修改的情況
while (true)
{
    int bucketNo, lockNo;
    //變量復制到方法本地變量  判斷tables是否在操作過程中被其他線程修改。
    Tables tables = _tables;
    //提到過的獲取桶的索引和鎖的索引
    GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
    //是否要擴大tables
    bool resizeDesired = false;
    //是否成功獲取鎖,成功的話會在final塊中進行退出
    bool lockTaken = false;
    try
    {
        if (acquireLock)
            Monitor.Enter(tables._locks[lockNo], ref lockTaken);

        // If the table just got resized, we may not be holding the right lock, and must retry.
        // This should be a rare occurrence.
        if (tables != _tables)
        {
            continue;
        }

        // Try to find this key in the bucket
        Node prev = null;
        //這裏如果找到對應地址為空,會直接跳出循環,說明對應的key沒有添加鍋
        //不為空的時候,會進行返回false(具體是否更新根據updateIfExists)(當然也存在會有相同_hashcode值的情況,所以還要對key進行判定,key不同,繼續往後找,直到找到相同key)
        for (Node node = tables._buckets[bucketNo]; node != null; node = node._next)
        {
            Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node);
            //對hashcode和key進行判定,確保找到的就是要更新的
            if (hashcode == node._hashcode && _comparer.Equals(node._key, key))
            {
                // The key was found in the dictionary. If updates are allowed, update the value for that key.
                // We need to create a new node for the update, in order to support TValue types that cannot
                // be written atomically, since lock-free reads may be happening concurrently.
                if (updateIfExists)
                {
                    if (s_isValueWriteAtomic)
                    {
                        node._value = value;
                    }
                    else
                    {
                        Node newNode = new Node(node._key, value, hashcode, node._next);
                        if (prev == null)
                        {
                            Volatile.Write(ref tables._buckets[bucketNo], newNode);
                        }
                        else
                        {
                            prev._next = newNode;
                        }
                    }
                    resultingValue = value;
                }
                else
                {
                    resultingValue = node._value;
                }
                return false;
            }
            prev = node;
        }

        // The key was not found in the bucket. Insert the key-value pair.
        Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo]));
        checked
        {
            tables._countPerLock[lockNo]++;
        }

        //
        // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table.
        // It is also possible that GrowTable will increase the budget but won‘t resize the bucket table.
        // That happens if the bucket table is found to be poorly utilized due to a bad hash function.
        //
        if (tables._countPerLock[lockNo] > _budget)
        {
            resizeDesired = true;
        }
    }
    finally
    {
        if (lockTaken)
            Monitor.Exit(tables._locks[lockNo]);
    }

    //
    // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table.
    //
    // Concurrency notes:
    // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks.
    // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0
    //   and then verify that the table we passed to it as the argument is still the current table.
    //
    if (resizeDesired)
    {
        GrowTable(tables);
    }

    resultingValue = value;
    return true;
}

ContainsKey和TryGetValue

ContainsKey和TryGetValue其實內部最後調用的都是私有TryGetValueInternal,這裏ContainsKey調用TryGetValue。

ContainsKey方法

/// <summary>
/// Determines whether the ConcurrentDictionary{TKey, TValue} contains the specified key.
/// </summary>
/// <param name="key">The key to locate in the</param>
/// <returns>true if the ConcurrentDictionary{TKey, TValue} contains an element withthe specified key; otherwise, false.</returns>
public bool ContainsKey(TKey key)
{
    if (key == null) ThrowKeyNullException();
    TValue throwAwayValue;
    return TryGetValue(key, out throwAwayValue);
}

TryGetValue方法

/// <summary>
/// Attempts to get the value associated with the specified key from the ConcurrentDictionary{TKey,TValue}.
/// </summary>
/// <param name="key">The key of the value to get.</param>
/// <param name="value">When this method returns, <paramref name="value"/> contains the object from
/// the ConcurrentDictionary{TKey,TValue} with the specified key or the default value of
/// <returns>true if the key was found in the <see cref="ConcurrentDictionary{TKey,TValue}"/>;
/// otherwise, false.</returns>
public bool TryGetValue(TKey key, out TValue value)
{
    if (key == null) ThrowKeyNullException();
    return TryGetValueInternal(key, _comparer.GetHashCode(key), out value);
}

TryGetValueInternal方法

private bool TryGetValueInternal(TKey key, int hashcode, out TValue value)
{
    //用本地變量保存這個table的快照。
    // We must capture the _buckets field in a local variable. It is set to a new table on each table resize.
Tables tables = _tables;
//獲取key對應的桶位置
    int bucketNo = GetBucket(hashcode, tables._buckets.Length);
    // We can get away w/out a lock here.
    // The Volatile.Read ensures that we have a copy of the reference to tables._buckets[bucketNo].
    // This protects us from reading fields (‘_hashcode‘, ‘_key‘, ‘_value‘ and ‘_next‘) of different instances.
Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]);
//如果key相符 ,賦值,不然繼續尋找下一個。
    while (n != null)
    {
        if (hashcode == n._hashcode && _comparer.Equals(n._key, key))
        {
            value = n._value;
            return true;
        }
        n = n._next;
    }
    value = default(TValue);//沒找到就賦默認值
    return false;
}

TryRemove

TryRemove方法

public bool TryRemove(TKey key, out TValue value)
{
    if (key == null) ThrowKeyNullException();
    return TryRemoveInternal(key, out value, false, default(TValue));
}

這個方法會調用內部私用的TryRemoveInternal

/// <summary>
/// Removes the specified key from the dictionary if it exists and returns its associated value.
/// If matchValue flag is set, the key will be removed only if is associated with a particular
/// value.
/// </summary>
/// <param name="key">The key to search for and remove if it exists.</param>
/// <param name="value">The variable into which the removed value, if found, is stored.</param>
/// <param name="matchValue">Whether removal of the key is conditional on its value.</param>
/// <param name="oldValue">The conditional value to compare against if <paramref name="matchValue"/> is true</param>
/// <returns></returns>
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
{
    int hashcode = _comparer.GetHashCode(key);
    while (true)
    {
        Tables tables = _tables;
        int bucketNo, lockNo;
        //這裏獲取桶的索引和鎖的索引,註意,鎖的索引和桶未必是同一個值,具體算法看源碼。
        GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
        //這裏鎖住的只是對應這個index指向的鎖,而不是所有鎖。
        lock (tables._locks[lockNo])
        {
            //這裏table可能被重新分配,所以這裏再次獲取,看得到的是不是同一個table
            // If the table just got resized, we may not be holding the right lock, and must retry.
            // This should be a rare occurrence.
            if (tables != _tables)
            {
                continue;
            }

            Node prev = null;
            //這裏同一個桶,可能因為連地址,有很多值,所以要對比key
            for (Node curr = tables._buckets[bucketNo]; curr != null; curr = curr._next)
            {
                Debug.Assert((prev == null && curr == tables._buckets[bucketNo]) || prev._next == curr);
                //對比是不是要刪除的的那個元素
                if (hashcode == curr._hashcode && _comparer.Equals(curr._key, key))
                {
                    if (matchValue)
                    {
                        bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr._value);
                        if (!valuesMatch)
                        {
                            value = default(TValue);
                            return false;
                        }
                    }
                    //執行刪除,判斷有沒有上一個節點。然後修改節點指針或地址。
                    if (prev == null)
                    {
                        Volatile.Write<Node>(ref tables._buckets[bucketNo], curr._next);
                    }
                    else
                    {
                        prev._next = curr._next;
                    }

                    value = curr._value;
                    tables._countPerLock[lockNo]--;
                    return true;
                }
                prev = curr;
            }
        }
        value = default(TValue);
        return false;
    }
}

我的使用實例

之前做項目時候,有個奇怪的場景,就是打電話的時候回調接口保存通話記錄,這裏通過CallId來唯一識別每次通話,但是前端程序是通過websocket跟通話服務建立連接(通話服務是另外一個公司做的)。客戶是呼叫中心,一般在網頁端都是多個頁面操作,所以會有多個websocket連接,這時候每次通話,每個頁面都會回調接口端,保存相同的通話記錄,並發是同一時間的。

我們最早考慮使用消息隊列來過濾重復的請求,但是我仔細考慮了下,發現使用ConcurrentDictionary方式的實現更簡單,具體實現如下(我精簡了下代碼):

private  static ConcurrentDictionary<string,string> _strDic=new ConcurrentDictionary<string, string>();
public async Task<BaseResponse> AddUserByAccount(string callId)
{
    if ( _strDic.ContainsKey(callId))
    {
        return BaseResponse.GetBaseResponse(BusinessStatusType.Failed,"鍵值已存在");
    }
    //成功寫入
    if (_strDic.TryAdd(callId,callId))
    {
        var  recordExist =await _userRepository.FirstOrDefaultAsync(c => c.CallId == callId);
        if (recordExist ==null)
        {
            Record record=new Record
            {
                CallId = callId,
                …………
                …………
                IsVerify=1
            };
            _userRepository.Insert(record);
            _userRepository.SaveChanges();
        } 
        return BaseResponse.GetBaseResponse(BusinessStatusType.OK);
    }
    //嘗試競爭線程,寫入失敗
    return BaseResponse.GetBaseResponse(BusinessStatusType.Failed,"寫入失敗");
}

  這裏如果進行同時的並發請求,最後請求都可以通過if ( _strDic.ContainsKey(callId))的判定,因為所有線程同時讀取,都是未寫入狀態。但是多個線程會在TryAdd時有競爭,而且ConcurrentDictionary的實現保證了只有一個線程可以成功更新,其他的都返回失敗。

一起來看CORE源碼(一) ConcurrentDictionary