LongAdder解析
對 LongAdder
的最初瞭解是從Coolshell上的一篇文章中獲得的,但是一直都沒有深入的瞭解過其實現,只知道它相較於 AtomicLong
來說,更加適合寫多讀少的併發情景。今天,我們就研究一下 LongAdder
的原理,探究一下它如此高效的原因。
基本原理和思想
Java有很多併發控制機制,比如說以AQS為基礎的鎖或者以CAS為原理的自旋鎖。不瞭解AQS的朋友可以閱讀我之前的AQS原始碼解析文章。一般來說,CAS適合輕量級的併發操作,也就是併發量並不多,而且等待時間不長的情況,否則就應該使用普通鎖,進入阻塞狀態,避免CPU空轉。
所以,如果你有一個Long型別的值會被多執行緒修改,那麼使用CAS進行併發控制比較好,但是如果你是需要鎖住一些資源,然後進行資料庫操作,那麼還是使用阻塞鎖比較好。
第一種情況下,我們一般都使用 AtomicLong
。 AtomicLong
是通過無限迴圈不停的採取CAS的方法去設定內部的value,直到成功為止。那麼當併發數比較多或出現更新熱點時,就會導致CAS的失敗機率變高,重試次數更多,越多的執行緒重試,CAS失敗的機率越高,形成惡性迴圈,從而降低了效率。
而LongAdder的原理就是降低對value更新的併發數,也就是將對單一value的變更壓力分散到多個value值上,降低單個value的“熱度”。
我們知道 LongAdder
的大致原理之後,再來詳細的瞭解一下它的具體實現,其中也有很多值得借鑑的併發程式設計的技巧。
LongAdder的成員變數
LongAdder
是 Striped64
的子類,其有三個比較重要的成員函式,在之後的函式分析中需要使用到,這裡先說明一下。
// CPU的數量 static final int NCPU = Runtime.getRuntime().availableProcessors(); // Cell物件的陣列,長度一般是2的指數 transient volatile Cell[] cells; // 基礎value值,當併發較低時,只累加該值 transient volatile long base; // 建立或者擴容Cells陣列時使用的自旋鎖變數 transient volatile int cellsBusy; 複製程式碼
cells
是 LongAdder
的父類 Striped64
中的 Cell
陣列型別的成員變數。每個 Cell
物件中都包含一個value值,並提供對這個value值的CAS操作。
static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } } 複製程式碼
Add操作
我們首先來看一下 LongAdder
的 add
函式,其會多次嘗試CAS操作將值進行累加,如果成功了就直接返回,失敗則繼續執行。程式碼比較複雜,而且涉及的情況比較多,我們就以梳理歷次嘗試CAS操作為主線,講清楚這些CAS操作的前提條件和場景。
public void add(long x) { Cell[] as; long b, v; int m; Cell a; // 當cells陣列為null時,會進行第一次cas操作嘗試。 if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) // 當cells陣列不為null,並且通過getProbe() & m // 定位的Cell物件不為null時進行第二次CAS操作。 // 如果執行不成功,則進入longAccumulate函式。 longAccumulate(x, null, uncontended); } } 複製程式碼
當併發量較少時,cell陣列尚未初始化,所以只調用 casBase
函式,對base變數進行CAS累加。

我們來看一下 casBase
函式相關的原始碼吧。我們可以認為變數 base
就是第一個value值,也是基礎value變數。先呼叫casBase函式來cas一下base變數,如果成功了,就不需要在進行下面比較複雜的演算法,
final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } 複製程式碼
當併發量逐漸提高時, casBase
函式會失敗。如果cells陣列為null或為空,就直接呼叫 longAccumulate
方法。因為cells為null或在為空,說明cells未初始化,所以呼叫 longAccumulate
進行初始化。否則繼續判斷。 如果cells中已經初始化,就繼續進行後續判斷。我們先來理解一下 getProbe() & m
的這個操作吧,可以把這個操作當作一次計算"hash"值,然後將cells中這個位置的Cell物件賦值給變數a。如果變數a不為null,那麼就呼叫該物件的cas方法去設定其value值。如果a為null,或在cas賦值發生衝突,那麼呼叫 longAccumulate
方法。

LongAccumulate方法
longAccumulate
函式比較複雜,帶有我的註釋的程式碼已經貼在了文章後邊,這裡我們就只講一下其中比較關鍵的一些技巧和思想。
首先,我們都知道只有當對 base
的cas操作失敗之後, LongAdder
才引入 Cell
陣列.所以在 longAccumulate
中就是對 Cell
陣列進行操作,分別涉及了陣列的初始化,擴容和設定某個位置的Cell物件等操作。
在這段程式碼中,關於 cellBusy
的cas操作構成了一個SpinLock,這就是經典的SpinLock的程式設計技巧,大家可以學習一下。
我們先來看一下 longAccumulate
的主體程式碼,首先是一個無限for迴圈,然後根據cells陣列的狀態來判斷是要進行cells陣列的初始化,還是進行物件新增或者擴容。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { //獲取PROBE變數,探針變數,與當前執行的執行緒相關,不同執行緒不同 ThreadLocalRandom.current(); //初始化PROBE變數,和getProbe都使用Unsafe類提供的原子性操作。 h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { //cas經典無限迴圈,不斷嘗試 Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells不為null,並且陣列size大於0,表示cells已經初始化了 // 初始化Cell物件並設定到陣列中或者進行陣列擴容 } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //cells陣列未初始化,獲得cellsBusy lock,進行cells陣列的初始化 // cells陣列初始化操作 } //如果初始化陣列失敗了,那就再次嘗試一下直接cas base變數, // 如果成功了就直接返回,這是最後一個進行CAS操作的地方。 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } } 複製程式碼
進行Cell陣列程式碼如下所示,它首先呼叫 casCellsBusy
函式獲取了 cellsBusy
‘鎖’,然後進行陣列的初始化操作,最後將 cellBusy
'鎖'釋放掉。
// 注意在進入這段程式碼之前已經casCellsBusy獲得cellsBusy這個鎖變量了。 boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); //設定x的值為cell物件的value值 cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; 複製程式碼

如果Cell陣列已經初始化過了,那麼就進行Cell陣列的設定或者擴容。這部分程式碼有一系列的if else的判斷,如果前一個條件不成立,才會進入下一條判斷。
首先,當Cell陣列中對應位置的cell物件為null時,表明該位置的Cell物件需要進行初始化,所以使用 casCellsBusy
函式獲取'鎖',然後初始化Cell物件,並且設定進cells陣列,最後釋放掉'鎖'。
當Cell陣列中對應位置的cell物件不為null,則直接呼叫其cas操作進行累加。
當上述操作都失敗後,認為多個執行緒在對同一個位置的Cell物件進行操作,這個Cell物件是一個“熱點”,所以Cell陣列需要進行擴容,將熱點分散。
if ((a = as[(n - 1) & h]) == null) { //通過與操作計算出來需要操作的Cell物件的座標 if (cellsBusy == 0) { //volatile 變數,用來實現spinLock,來在初始化和resize cells陣列時使用。 //當cellsBusy為0時,表示當前可以對cells陣列進行操作。 Cell r = new Cell(x);//將x值直接賦值給Cell物件 if (cellsBusy == 0 && casCellsBusy()) {//如果這個時候cellsBusy還是0 //就cas將其設定為非0,如果成功了就是獲得了spinLock的鎖.可以對cells陣列進行操作. //如果失敗了,就會再次執行一次迴圈 boolean created = false; try { Cell[] rs; int m, j; //判斷cells是否已經初始化,並且要操作的位置上沒有cell物件. if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; //將之前建立的值為x的cell物件賦值到cells陣列的響應位置. created = true; } } finally { //經典的spinLock程式設計技巧,先獲得鎖,然後try finally將鎖釋放掉 //將cellBusy設定為0就是釋放鎖. cellsBusy = 0; } if (created) break; //如果建立成功了,就是使用x建立了新的cell物件,也就是新建立了一個分擔熱點的value continue; } } collide = false; //未發生碰撞 } else if (!wasUncontended)//是否已經發生過一次cas操作失敗 wasUncontended = true; //設定成true,以便第二次進入下一個else if 判斷 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) //fn是操作型別,如果是空,就是相加,所以讓a這個cell物件中的value值和x相加,然後在cas設定,如果成果 //就直接返回 break; else if (n >= NCPU || cells != as) //如果cells陣列的大小大於系統的可獲得處理器數量或在as不再和cells相等. collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { //再次獲得cellsBusy這個spinLock,對陣列進行resize try { if (cells == as) {//要再次檢測as是否等於cells以免其他執行緒已經對cells進行了操作. Cell[] rs = new Cell[n << 1]; //擴容一倍 for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs;//賦予cells一個新的陣列物件 } } finally { cellsBusy = 0; } collide = false; continue; } h = advanceProbe(h);//由於使用當前探針變數無法操作成功,所以重新設定一個,再次嘗試 複製程式碼

後記
本篇文章寫的不是很好,我寫完之後又看了一遍coolshell上的關於 LongAdder
的文章,感覺自己沒有人家寫的那麼簡潔明瞭。我對程式碼細節的註釋和投入太多了。其實很多程式碼大家都可以看懂,並不需要大量的程式碼片段加註釋。以後要注意一下。之後會接著研究一下JUC包中的其他類,希望大家多多關注。
