1. 程式人生 > >曹工說JDK原始碼(2)--ConcurrentHashMap的多執行緒擴容,說白了,就是分段取任務

曹工說JDK原始碼(2)--ConcurrentHashMap的多執行緒擴容,說白了,就是分段取任務

# 前言 先預先說明,我這邊jdk的程式碼版本為1.8.0_11,同時,因為我直接在本地jdk原始碼上進行了部分修改、除錯,所以,導致大家看到的我這邊貼的程式碼,和大家的不太一樣。 不過,我對原始碼進行修改、重構時,會保證和原始程式碼的功能、邏輯嚴格一致,更多時候,可能只是修改變數名,方便理解。 大家也知道,jdk程式碼寫得實在是比較深奧,變數名經常都是單字元,i,j,k啥的,實在是很難理解,所以,我一般會根據自己的理解,去重新命名,為了減輕我們的頭腦負擔。 至於怎麼去修改程式碼並除錯,可以參考我之前的文章: [曹工力薦:除錯 jdk 中 rt.jar 包部分的原始碼(可自由增加註釋,修改程式碼並debug)](https://www.cnblogs.com/grey-wolf/p/12817615.html) 文章中,我改過的程式碼放在: https://gitee.com/ckl111/jdk-debug # sizeCtl field的初始化 大家知道,concurrentHashMap底層是陣列+連結串列+紅黑樹,陣列的長度假設為n,在hashmap初始化的時候,這個n除了作為陣列長度,還會作為另一個關鍵field的值。 ```java /** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl; ``` 該欄位非常關鍵,根據取值不同,有不同的功能。 ## 使用預設建構函式時 ```java public ConcurrentHashMap() { } ``` 此時,sizeCtl被初始化為0. ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200607122049598-356913015.png) ## 使用帶初始容量的建構函式時 ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200607122423522-1640352275.png) 此時,sizeCtl也是32,和容量一致。 ## 使用另一個map來初始化時 ```java public ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } ``` 此時,sizeCtl,直接使用了預設值,16. ## 使用初始容量、負載因子來初始化時 ```java public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } ``` 這裡過載了: ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200607123004548-1008734630.png) 這裡,我們傳入的負載因子為0.75,這也是預設的負載因子,傳入的初始容量為14. 這裡面會根據: 1 + 14/0.75 = 19,拿到真正的size,然後根據size,獲取到第一個大於19的2的n次方,即32,來作為陣列容量,然後sizeCtl也被設定為32. # initTable時,對sizeCtl field的修改 實際上,new一個hashmap的時候,我們並沒有建立支撐陣列,那,什麼時候建立陣列呢?是在真正往裡面放資料的時候,比如put的時候。 ```java /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO(); vo.setBinCount(0); for (Node[] tab = table;;) { int tableLength; // 1 if (tab == null) { tab = initTable(); continue; } ... } ``` 1處,即會去初始化table。 ```java /** * Initializes table, using the size recorded in sizeCtl. * 初始化hashmap,使用sizeCtl作為容量 */ private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { sc = sizeCtl; if (sc < 0){ Thread.yield(); // lost initialization race; just spin continue; } /** * 走到這裡,說明sizeCtl大於0,大於0,代表什麼,可以去看下其建構函式,此時,sizeCtl表示 * capacity的大小。 * {@link #ConcurrentHashMap(int)} * * cas修改為-1,如果成功修改為-1,則表示搶到了鎖,可以進行初始化 * */ // 1 boolean bGotChanceToInit = U.compareAndSwapInt(this, SIZECTL, sc, -1); if (bGotChanceToInit) { try { tab = table; /** * 如果當前表為空,尚未初始化,則進行初始化,分配空間 */ if (tab == null || tab.length == 0) { /** * sc大於0,則以sc為準,否則使用預設的容量 */ int n = (sc >
0) ? sc : DEFAULT_CAPACITY; Node[] nt = (Node[]) new Node[n]; table = tab = nt; /** * n >>> 2,無符號右移2位,則是n的四分之一。 * n- n/4,結果為3/4 * n * 則,這裡修改sc為 3/4 * n * 比如,預設容量為16,則修改sc為12 */ // 2 sc = n - (n >>> 2); } } finally { /** * 修改sizeCtl到field */ // 3 sizeCtl = sc; } break; } } return tab; } ``` * 1處,cas修改sizeCtl為-1,成功了的,獲得初始化table的權利 * 2處,修改區域性變數sc為: n - (n >>> 2),也就是修改為 0.75n,假設此時的陣列容量為16,那麼sc就是16 * 0.75 = 12. * 3處,將sc賦值到field: sizeCtl 經過上面的分析,initTable時,這個欄位可能有兩種取值: * -1,有執行緒正在對該table進行初始化 * 0.75*陣列長度,此時,已經初始化完成 上面說的是,在put的時候去initTable,實際上,這個initTable,也會在以下函式中被呼叫,其共同點就是,都是往裡面放資料的操作: ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200607213859439-334499086.png) # 擴容時機 上面說了很多,目前,我們知道的是,在initTable後,sizeCtl的值,是舊的陣列的長度 * 0.75。 接下來,我們看看擴容時機,在put時,會呼叫putVal,這個函式的大體步驟: ```java final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 1 int hash = spread(key.hashCode()); int binCount = 0; System.out.println("binCount:" + binCount); // 2 ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO(); vo.setBinCount(0); for (Node[] tab = table;;) { int tableLength; // 3 if (tab == null) { tab = initTable(); continue; } tableLength = tab.length; if (tableLength == 0) { tab = initTable(); continue; } int entryNodeHashCode; // 4 int entryNodeIndex = (tableLength - 1) & hash; Node entryNode = tabAt(tab,entryNodeIndex); /** * 5 如果我們要放的桶,還是個空的,則直接cas放進去 */ if (entryNode == null) { Node node = new Node<>(hash, key, value, null); // no lock when adding to empty bin boolean bSuccess = casTabAt(tab, entryNodeIndex, null, node); if (bSuccess) { break; } else { /** * 如果沒成功,則繼續下一輪迴圈 */ continue; } } entryNodeHashCode = entryNode.hash; /** * 6 如果要放的這個桶,正在遷移,則幫助遷移 */ if (entryNodeHashCode == MOVED){ tab = helpTransfer(tab, entryNode); continue; } /** * 7 對entryNode加鎖 */ V oldVal = null; System.out.println("sync"); synchronized (entryNode) { /** * 這一行是判斷,在我們執行前面的一堆方法的時候,看看entryNodeIndex處的node是否變化 */ if (tabAt(tab, entryNodeIndex) != entryNode) { continue; } /** * 8 hashCode大於0,說明不是處於遷移狀態 */ if (entryNodeHashCode >= 0) { /** * 9 連結串列中找到合適的位置並放入 */ findPositionAndPut(key, value, onlyIfAbsent, hash, vo, entryNode); binCount = vo.getBinCount(); oldVal = (V) vo.getOldValue(); } else if (entryNode instanceof TreeBin) { ... } } System.out.println("binCount:" + binCount); // 10 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, entryNodeIndex); if (oldVal != null) return oldVal; break; } } // 11 addCount(1L, binCount); return null; } ``` * 1處,計算key的hashcode * 2處,我這邊new了一個物件,裡面兩個欄位: ```java public class ConcurrentHashMapPutResultVO { int binCount; V oldValue; } ``` 其中,oldValue用來存放,如果put進去的key/value,其中key已經存在的話,一般會直接覆蓋之前的舊值,這裡主要存放之前的舊值,因為我們需要返回舊值。 binCount,則存放:在找到對應的hash桶之後,在連結串列中,遍歷了多少個元素,該值後面會使用,作為一個標誌,當該標誌大於0的時候,才去進一步檢查,看看是否擴容。 * 3處,如果table為null,說明table裡沒有任何一個鍵值對,陣列也還沒建立,則初始化table * 4處,根據hashcode,和(陣列長度 - 1)相與,計算出應該存放的雜湊桶在陣列中的索引 * 5處,如果要放的雜湊桶,還是空的,則直接cas設定進去,成功則跳出迴圈,否則重試 * 6處,如果要放的這個桶,該節點的hashcode為MOVED(一個常量,值為-1),說明有其他執行緒正在擴容該hashmap,則幫助擴容 * 7處,對要存放的hash桶的頭節點加鎖 * 8處,如果頭節點的hashcode大於0,說明是拉了一條連結串列,則呼叫子方法(我這邊自己抽的),去找到合適的位置並插入到連結串列 * 9處,findPositionAndPut,在連結串列中,找到合適的位置,並插入 * 10處,在findPositionAndPut函式中,會返回:為了找到合適的位置,遍歷了多少個元素,這個值,就是binCount。 如果這個binCount大於8,則說明遍歷了8個元素,則需要轉紅黑樹了。 * 11處,因為我們新增了一個元素,總數自然要加1,這裡面會去增加總數,和檢查是否需要擴容。 其中,第9步,因為是自己抽的函式,所以這裡貼出來給大家看下: ```java /** * 遍歷連結串列,找到應該放的位置;如果遍歷完了還沒找到,則放到最後 * @param key * @param value * @param onlyIfAbsent * @param hash * @param vo * @param entryNode */ private void findPositionAndPut(K key, V value, boolean onlyIfAbsent, int hash, ConcurrentHashMapPutResultVO vo, Node entryNode) { vo.setBinCount(1); for (Node currentIterateNode = entryNode; ; vo.setBinCount(vo.getBinCount() + 1)) { /** * 如果當前遍歷指向的節點的hash值,與引數中的key的hash值相等,則, * 繼續判斷 */ K currentIterateNodeKey = currentIterateNode.key; boolean bKeyEqualOrNot = Objects.equals(currentIterateNodeKey, key); /** * key的hash值相等,且equals比較也相等,則就是我們要找的 */ if (currentIterateNode.hash == hash && bKeyEqualOrNot) { /** * 獲取舊的值 */ vo.setOldValue(currentIterateNode.val); /** * 覆蓋舊的node的val */ if (!onlyIfAbsent) currentIterateNode.val = value; // 這裡直接break跳出迴圈 break; } /** * 把當前節點儲存起來 */ Node pred = currentIterateNode; /** * 獲取下一個節點 */ currentIterateNode = currentIterateNode.next; /** * 如果下一個節點為null,說明當前已經是連結串列的最後一個node了 */ if ( currentIterateNode == null) { /** * 則在當前節點後面,掛上新的節點 */ pred.next = new Node(hash, key, value, null); break; } } } ``` 第11步,也是我們要看的重點: ```java private final void addCount(long delta, int check) { CounterCell[] counterCellsArray = counterCells; // 1 long b = baseCount; // 2 long newBaseCount = b + delta; /** * 3 直接cas在baseCount上增加 */ boolean bSuccess = U.compareAndSwapLong(this, BASECOUNT, b, newBaseCount); if ( counterCellsArray != null || !bSuccess) { ... newBaseCount = sumCount(); } // 4 if (check >= 0) { while (true) { Node[] tab = table; Node[] nt; int n = 0; // 5 int sc = sizeCtl; // 6 boolean bSumExteedSizeControl = newBaseCount >= (long) sc; // 7 boolean bContinue = bSumExteedSizeControl && tab != null && (n = tab.length) < MAXIMUM_CAPACITY; if (bContinue) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 8 transfer(tab, null); newBaseCount = sumCount(); } else { break; } } } } ``` * 1處,baseCount是一個field,儲存當前hashmap中,有多少個鍵值對,你put一次,就一個;remove一次,就減一個。 * 2處,b + delta,其中,b就是baseCount,是舊的數量;dalta,我們傳入的是1,就是要增加的元素數量 所以,b + delta,得到的,就是經過這次put後,預期的數量 * 3處,直接cas,修改baseCount這個field為 新值,也就是第二步拿到的值。 * 4處,這裡檢查check是否大於0,check,是第二個形參;這個引數,我們外邊怎麼傳的? `addCount(1L, binCount);` 不就是bincount嗎,也就是說,這裡檢查:我們在put過程中,在連結串列中遍歷了幾個元素,如果遍歷了至少1個元素,這裡要進入下面的邏輯:檢查是否要擴容,因為,你binCount大於0,說明可能已經開始出現雜湊衝突了。 * 5處,取field:sizeCtl的值,給區域性變數sc * 6處,判斷當前的新的鍵值對總數,是否大於sc了;比如容量是16,那麼sizeCtl是12,如果此時,hashmap中存放的鍵值對已經大於等於12了,則要檢查是否擴容了 * 7處,幾個組合條件,檢視是否要擴容,其中,主要的條件就是第6步的那個。 * 8處,呼叫transfer,進行擴容 總結一下,經過前面的第6處,我們知道,如果存放的鍵值對總數,已經大於等於0.75*雜湊桶(也就是底層陣列的長度)的數量了,那麼,就基本要擴容了。 # 擴容的大體過程 擴容也是一個相對複雜的過程,這裡只說大概,詳細的放下講。 假設,現在底層陣列長度,128,也就是128個雜湊桶,當存放的鍵值對數量,大於等於 128 * 0.75的時候,就會開始擴容,擴容的過程,大概是: * 申請一個256(容量翻倍)的陣列 * 現在有128個桶,相當於,需要對128個桶進行遍歷,遍歷每個桶拉出去的連結串列或紅黑樹,檢視每個鍵值對,是需要放到新陣列的什麼位置 這個過程,昨天的博文,畫了個圖,這裡再貼一下。 ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200606213956396-221625310.png) 擴容後: ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200606214550816-1904315910.png) 可是,如果我們要一個個去遍歷所有雜湊桶,然後遍歷對應的連結串列/紅黑樹,會不會太慢了?完全是單執行緒工作啊。 換個思路,我們能不能加快點呢?比如,執行緒1可以去處理陣列的 0 -15這16個桶,16- 31這16個桶,完全可以讓執行緒2去做啊,這樣的話,不就多執行緒了嗎,不是就快了嗎? 沒錯,jdk就是這麼幹的。 jdk維護了一個field,這個field,專門用來存當前可以獲取的任務的索引,舉個例子: ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200607222235395-450319366.png) 大家看上圖就懂了,一開始,這裡假設我們有128個桶,每次每個執行緒,去拿16個桶來處理。 剛開始的時候,field:transferIndex就等於127,也就是最後一個桶的位置,然後我們要從後往前取,那麼,127 到112,剛好就是16個桶,所以,申請任務的時候,就會用cas去更新field為112,則表示,自己取到了112 到127這一個區間的hash桶遷移任務。 如果自始至終,只有一個執行緒呢,它處理完了112 - 127這一批hash桶後,會繼續取下一波任務,96 - 112;以此類推。 如果多執行緒的話呢,也是類似的,反正都是去嘗試cas更新transferIndex的值為任務區間的開始下標的值,成功了,就算任務認領成功了。 多執行緒,怎麼知道需要去幫助擴容呢? 發起擴容的執行緒,在處理完bucket[k]時,會把老的table中的對應的bucket[k]的頭節點,修改為下面這種型別的節點: ```java static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } } ``` 其他執行緒,在put或者其他操作時,發現頭結點變成了這個,就會去協助擴容了。 # 多執行緒擴容,和分段取任務的差別? 我個人感覺,差別不大,多執行緒擴容,就是多執行緒去獲取自己的那一段任務,然後來完成。我這邊寫了簡單的demo,不過感覺還是很有用的,可以幫助我們理解。 ```java import sun.misc.Unsafe; import java.lang.reflect.Field; import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; public class ConcurrentTaskFetch { /** * 空閒任務索引,獲取任務時,從該下標開始,往前獲取。 * 比如當前下標為10,表示tasks陣列中,0-10這個區間的任務,沒人領取 */ // 0 private volatile int freeTaskIndexForFetch; // 1 private static final int TASK_COUNT_PER_FETCH = 16; // 2 private String[] tasks = new String[128]; public static void main(String[] args) { ConcurrentTaskFetch fetch = new ConcurrentTaskFetch(); // 3 fetch.init(); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); executor.prestartAllCoreThreads(); CyclicBarrier cyclicBarrier = new CyclicBarrier(10); // 4 for (int i = 0; i < 10; i++) { executor.execute(new Runnable() { @Override public void run() { try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } // 5 FetchedTaskInfo fetchedTaskInfo = fetch.fetchTask(); if (fetchedTaskInfo != null) { System.out.println("thread:" + Thread.currentThread().getName() + ",get task success:" + fetchedTaskInfo); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread:" + Thread.currentThread().getName() + ", process task finished"); } } }); } LockSupport.park(); } public void init() { for (int i = 0; i < 128; i++) { tasks[i] = "task" + i; } freeTaskIndexForFetch = tasks.length; } // 6 public FetchedTaskInfo fetchTask() { System.out.println("Thread start fetch task:"+Thread.currentThread().getName()+",time: "+System.currentTimeMillis()); while (true){ // 6.1 if (freeTaskIndexForFetch == 0) { System.out.println("thread:" + Thread.currentThread().getName() + ",get task failed,there is no task"); return null; } /** * 6.2 獲取當前任務的集合的上界 */ int subTaskListEndIndex = this.freeTaskIndexForFetch; /** * 6.3 獲取當前任務的集合的下界 */ int subTaskListStartIndex = subTaskListEndIndex >
TASK_COUNT_PER_FETCH ? subTaskListEndIndex - TASK_COUNT_PER_FETCH : 0; /** * 6.4 * 現在,我們拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex) * 該區間為前開後閉,所以,實際的區間為: * [subTaskListStartIndex,subTaskListEndIndex - 1] */ /** * 6.5 使用cas,嘗試更新{@link freeTaskIndexForFetch} 為 subTaskListStartIndex */ if (U.compareAndSwapInt(this, FREE_TASK_INDEX_FOR_FETCH, subTaskListEndIndex, subTaskListStartIndex)) { // 6.6 FetchedTaskInfo info = new FetchedTaskInfo(); info.setStartIndex(subTaskListStartIndex); info.setEndIndex(subTaskListEndIndex - 1); return info; } } } // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long FREE_TASK_INDEX_FOR_FETCH; static { try { // U = sun.misc.Unsafe.getUnsafe(); Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); U = (Unsafe) f.get(null); Class k = ConcurrentTaskFetch.class; FREE_TASK_INDEX_FOR_FETCH = U.objectFieldOffset (k.getDeclaredField("freeTaskIndexForFetch")); } catch (Exception e) { throw new Error(e); } } static class FetchedTaskInfo{ int startIndex; int endIndex; public int getStartIndex() { return startIndex; } public void setStartIndex(int startIndex) { this.startIndex = startIndex; } public int getEndIndex() { return endIndex; } public void setEndIndex(int endIndex) { this.endIndex = endIndex; } @Override public String toString() { return "FetchedTaskInfo{" + "startIndex=" + startIndex + ", endIndex=" + endIndex + '}'; } } } ``` * 0處,定義了一個field,類似於前面的transferIndex ```java /** * 空閒任務索引,獲取任務時,從該下標開始,往前獲取。 * 比如當前下標為10,表示tasks陣列中,0-10這個區間的任務,沒人領取 */ // 0 private volatile int freeTaskIndexForFetch; ``` * 1,定義了每次取多少個任務,這裡也是16個 ```java private static final int TASK_COUNT_PER_FETCH = 16; ``` * 2,定義任務列表,共128個任務 * 3,main函式中,進行任務初始化 ```java public void init() { for (int i = 0; i < 128; i++) { tasks[i] = "task" + i; } freeTaskIndexForFetch = tasks.length; } ``` 主要初始化任務列表,其次,將freeTaskIndexForFetch 賦值為128,後續取任務,從這個下標開始 * 4處,啟動10個執行緒,每個執行緒去執行取任務,按理說,我們128個任務,每個執行緒取16個,只能有8個執行緒取到任務,2個執行緒取不到 * 5處,執行緒邏輯裡,去獲取任務 * 6處,獲取任務的方法定義 * 6.1 ,如果可獲取的任務索引為0了,說明沒任務了,直接返回 * 6.2,獲取當前任務的集合的上界 * 6.3,獲取當前任務的集合的下界,減去16就行了 * 6.4,拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex) * 6.5, 使用cas,更新field為:6.4中的任務下界。 執行效果演示: ![](https://img2020.cnblogs.com/blog/519126/202006/519126-20200607223859729-95854827.png) 可以看到,8個執行緒取到任務,2個執行緒沒取到。 # 該思想在記憶體分配時的應用 其實jvm記憶體分配時,也是類似的思路,比如,設定堆記憶體為200m,那這200m是啟動時立馬從作業系統分配了的。 接下來,就是每次new物件的時候,去這個大記憶體裡,找個小空間,這個過程,也是需要cas去競爭的,比如肯定也有個全域性的欄位,來表示當前可用記憶體的索引,比如該索引為100,表示,第100個位元組後的空間是可以用的,那我要new個物件,這個物件有3個欄位,需要大概30個位元組,那我是不是需要把這個索引更新為130。 這中間是多執行緒的,所以也是要cas操作。 道理都是類似的。 # 總結 時間倉促,有問題在所難免,歡迎及時指出或加群