1. 程式人生 > >[.net 多執行緒]ConcurrentBag原始碼分析

[.net 多執行緒]ConcurrentBag原始碼分析

ConcurrentBag根據操作執行緒,對不同執行緒分配不同的佇列進行資料操作。這樣,每個佇列只有一個執行緒在操作,不會發生併發問題。其內部實現運用了net4.0新加入的ThreadLocal執行緒本地儲存功能。各個佇列間通過連結串列維護。

其內部結構如下:

 

1、獲取執行緒本地佇列:

 1 /// <summary>
 2 /// 獲取當前執行緒的佇列
 3 /// </summary>
 4 /// <param name="forceCreate">如果執行緒沒有持有佇列,是否新建</param>
 5 /// <returns></returns>
6 private ThreadLocalList<T> GetThreadList(bool forceCreate) 7 { 8 //嘗試獲取執行緒本地佇列列表(參考ThreadLocal),此處的m_locals不同執行緒持有不同例項 9 //如果獲取為空,則說明執行緒是第一次執行此函式,需要分配一個佇列 10 ThreadLocalList<T> unownedList = this.m_locals.Value; 11 if (unownedList != null) 12 { 13 return unownedList;
14 } 15 if (forceCreate) 16 { 17 //獲取當前本地佇列鎖,防止在凍結佇列時產生衝突(參考FreezeBag函式) 18 object globalListsLock = this.GlobalListsLock; 19 lock (globalListsLock) 20 { 21 //獲取本地佇列 22 //如果沒有建立過佇列,則建立一個新的佇列;否則儘量分配已有的執行緒終止的佇列 23 if (this.m_headList == null
) 24 { 25 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 26 this.m_headList = unownedList; 27 this.m_tailList = unownedList; 28 } 29 else 30 { 31 //獲取無主佇列,不分配新佇列 32 unownedList = this.GetUnownedList(); 33 if (unownedList == null) 34 { 35 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 36 this.m_tailList.m_nextList = unownedList; 37 this.m_tailList = unownedList; 38 } 39 } 40 this.m_locals.Value = unownedList; 41 return unownedList; 42 } 43 } 44 return null; 45 }
獲取當前執行緒持有的佇列

2、獲取無主佇列

 1 /// <summary>
 2 /// 獲取無主佇列
 3 /// 如果當前佇列的持有執行緒已經終止,則為無主佇列
 4 /// </summary>
 5 /// <returns></returns>
 6 private ThreadLocalList<T> GetUnownedList()
 7 {
 8     for (ThreadLocalList<T> list = this.m_headList; list != null; list = list.m_nextList)
 9     {
10         if (list.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
11         {
12             list.m_ownerThread = Thread.CurrentThread;
13             return list;
14         }
15     }
16     return null;
17 }
獲取無主佇列

3、插入操作程式碼分析

 1 /// <summary>
 2 /// 向Bag新增元素
 3 /// </summary>
 4 /// <param name="item"></param>
 5        
 6 [__DynamicallyInvokable]
 7 public void Add(T item)
 8 {
 9     //獲取當前執行緒持有的佇列
10     ThreadLocalList<T> threadList = this.GetThreadList(true);
11     //向當前持有佇列新增資料
12     this.AddInternal(threadList, item);
13 }
14 
15 /// <summary>
16 /// 向佇列新增資料
17 /// </summary>
18 /// <param name="list"></param>
19 /// <param name="item"></param>
20 private void AddInternal(ThreadLocalList<T> list, T item)
21 {
22     bool lockTaken = false;
23     try
24     {
25         //CAS原子操作,設定標誌位,與Steal和Freeze實現互斥
26         Interlocked.Exchange(ref list.m_currentOp, 1);
27         //如果m_needSync,則說明已經發起凍結操作,需要加鎖保證執行緒安全
28         if ((list.Count < 2) || this.m_needSync)
29         {
30             list.m_currentOp = 0;
31             Monitor.Enter(list, ref lockTaken);
32         }
33         list.Add(item, lockTaken);
34     }
35     finally
36     {
37         list.m_currentOp = 0;
38         if (lockTaken)
39         {
40             Monitor.Exit(list);
41         }
42     }
43 }
插入操作

4、凍結Bag函式

 1 /// <summary>
 2 /// 凍結Bag,不能進行增,刪,獲取操作
 3 /// </summary>
 4 /// <param name="lockTaken"></param>
 5 private void FreezeBag(ref bool lockTaken)
 6 {
 7     //獲取當前執行緒list鎖
 8     Monitor.Enter(this.GlobalListsLock, ref lockTaken);
 9     //設定同步標誌位,增,刪,獲取操作識別此標誌位,只有獲取鎖才能執行
10     this.m_needSync = true;
11     //獲取所有list的鎖
12     this.AcquireAllLocks();
13     //等待所有操作執行完成
14     this.WaitAllOperations();
15 }
凍結bag

5、轉化成陣列

 1 /// <summary>
 2 /// 轉化為陣列
 3 /// </summary>
 4 /// <returns></returns>
 5 [__DynamicallyInvokable]
 6 public T[] ToArray()
 7 {
 8     T[] localArray;
 9     //沒有資料返回空陣列
10     if (this.m_headList == null)
11     {
12         return new T[0];
13     }
14     bool lockTaken = false;
15     try
16     {
17         //凍結bag
18         this.FreezeBag(ref lockTaken);
19         //轉化成List後直接轉成Array
20         localArray = this.ToList().ToArray();
21     }
22     finally
23     {
24         this.UnfreezeBag(lockTaken);
25     }
26     return localArray;
27 }
28 
29 /// <summary>
30 /// 轉化成list
31 /// </summary>
32 /// <returns></returns>
33 private List<T> ToList()
34 {
35     List<T> list = new List<T>();
36     //獲取所有list,遍歷生成副本
37     for (ThreadLocalList<T> list2 = this.m_headList; list2 != null; list2 = list2.m_nextList)
38     {
39         for (Node<T> node = list2.m_head; node != null; node = node.m_next)
40         {
41             list.Add(node.m_value);
42         }
43     }
44     return list;
45 }
轉化成陣列