1. 程式人生 > >死磕 java併發包之LongAdder原始碼分析

死磕 java併發包之LongAdder原始碼分析

問題

(1)java8中為什麼要新增LongAdder?

(2)LongAdder的實現方式?

(3)LongAdder與AtomicLong的對比?

簡介

LongAdder是java8中新增的原子類,在多執行緒環境中,它比AtomicLong效能要高出不少,特別是寫多的場景。

它是怎麼實現的呢?讓我們一起來學習吧。

原理

LongAdder的原理是,在最初無競爭時,只更新base的值,當有多執行緒競爭時通過分段的思想,讓不同的執行緒更新不同的段,最後把這些段相加就得到了完整的LongAdder儲存的值。

原始碼分析

LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內部類和各重要屬性。

主要內部類

// Striped64中的內部類,使用@sun.misc.Contended註解,說明裡面的值消除偽共享
@sun.misc.Contended static final class Cell {
    // 儲存元素的值,使用volatile修飾保證可見性
    volatile long value;
    Cell(long x) { value = x; }
    // CAS更新value的值
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe例項
    private static final sun.misc.Unsafe UNSAFE;
    // value欄位的偏移量
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell類使用@sun.misc.Contended註解,說明是要避免偽共享的。

使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性。

關於Unsafe的介紹請檢視【死磕 java魔法類之Unsafe解析】。

關於偽共享的介紹請檢視【雜談 什麼是偽共享(false sharing)?】。

主要屬性

// 這三個屬性都在Striped64中
// cells陣列,儲存各個段的值
transient volatile Cell[] cells;
// 最初無競爭時使用的,也算一個特殊的段
transient volatile long base;
// 標記當前是否有執行緒在建立或擴容cells,或者在建立Cell
// 通過CAS更新該值,相當於是一個鎖
transient volatile int cellsBusy;

最初無競爭或有其它執行緒在建立cells陣列時使用base更新值,有過競爭時使用cells更新值。

最初無競爭是指一開始沒有執行緒之間的競爭,但也有可能是多執行緒在操作,只是這些執行緒沒有同時去更新base的值。

有過競爭是指只要出現過競爭不管後面有沒有競爭都使用cells更新值,規則是不同的執行緒hash到不同的cell上去更新,減少競爭。

add(x)方法

add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中儲存的值增加x,x可為正可為負。

public void add(long x) {
    // as是Striped64中的cells屬性
    // b是Striped64中的base屬性
    // v是當前執行緒hash到的Cell中儲存的值
    // m是cells的長度減1,hash時作為掩碼使用
    // a是當前執行緒hash到的Cell
    Cell[] as; long b, v; int m; Cell a;
    // 條件1:cells不為空,說明出現過競爭,cells已經建立
    // 條件2:cas操作base失敗,說明其它執行緒先一步修改了base,正在出現競爭
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // true表示當前競爭還不激烈
        // false表示競爭激烈,多個執行緒hash到同一個Cell,可能要擴容
        boolean uncontended = true;
        // 條件1:cells為空,說明正在出現競爭,上面是從條件2過來的
        // 條件2:應該不會出現
        // 條件3:當前執行緒所在的Cell為空,說明當前執行緒還沒有更新過Cell,應初始化一個Cell
        // 條件4:更新當前執行緒所在的Cell失敗,說明現在競爭很激烈,多個執行緒hash到了同一個Cell,應擴容
        if (as == null || (m = as.length - 1) < 0 ||
            // getProbe()方法返回的是執行緒中的threadLocalRandomProbe欄位
            // 它是通過隨機數生成的一個值,對於一個確定的執行緒這個值是固定的
            // 除非刻意修改它
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 呼叫Striped64中的方法處理
            longAccumulate(x, null, uncontended);
    }
}

(1)最初無競爭時只更新base;

(2)直到更新base失敗時,建立cells陣列;

(3)當多個執行緒競爭同一個Cell比較激烈時,可能要擴容;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    // 儲存執行緒的probe值
    int h;
    // 如果getProbe()方法返回0,說明隨機數未初始化
    if ((h = getProbe()) == 0) {
        // 強制初始化
        ThreadLocalRandom.current(); // force initialization
        // 重新獲取probe值
        h = getProbe();
        // 都未初始化,肯定還不存在競爭激烈
        wasUncontended = true;
    }
    // 是否發生碰撞
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // cells已經初始化過
        if ((as = cells) != null && (n = as.length) > 0) {
            // 當前執行緒所在的Cell未初始化
            if ((a = as[(n - 1) & h]) == null) {
                // 當前無其它執行緒在建立或擴容cells,也沒有執行緒在建立Cell
                if (cellsBusy == 0) {       // Try to attach new Cell
                    // 新建一個Cell,值為當前需要增加的值
                    Cell r = new Cell(x);   // Optimistically create
                    // 再次檢測cellsBusy,並嘗試更新它為1
                    // 相當於當前執行緒加鎖
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 是否建立成功
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 重新獲取cells,並找到當前執行緒hash到cells陣列中的位置
                            // 這裡一定要重新獲取cells,因為as並不在鎖定範圍內
                            // 有可能已經擴容了,這裡要重新獲取
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 把上面新建的Cell放在cells的j位置處
                                rs[j] = r;
                                // 建立成功
                                created = true;
                            }
                        } finally {
                            // 相當於釋放鎖
                            cellsBusy = 0;
                        }
                        // 建立成功了就返回
                        // 值已經放在新建的Cell裡面了
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                // 標記當前未出現衝突
                collide = false;
            }
            // 當前執行緒所在的Cell不為空,且更新失敗了
            // 這裡簡單地設為true,相當於簡單地自旋一次
            // 通過下面的語句修改執行緒的probe再重新嘗試
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 再次嘗試CAS更新當前執行緒所在Cell的值,如果成功了就返回
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 如果cells陣列的長度達到了CPU核心數,或者cells擴容了
            // 設定collide為false並通過下面的語句修改執行緒的probe再重新嘗試
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 上上個elseif都更新失敗了,且上個條件不成立,說明出現衝突了
            else if (!collide)
                collide = true;
            // 明確出現衝突了,嘗試佔有鎖,並擴容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 檢查是否有其它執行緒已經擴容過了
                    if (cells == as) {      // Expand table unless stale
                        // 新陣列為原陣列的兩倍
                        Cell[] rs = new Cell[n << 1];
                        // 把舊陣列元素拷貝到新陣列中
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 重新賦值cells為新陣列
                        cells = rs;
                    }
                } finally {
                    // 釋放鎖
                    cellsBusy = 0;
                }
                // 已解決衝突
                collide = false;
                // 使用擴容後的新陣列重新嘗試
                continue;                   // Retry with expanded table
            }
            // 更新失敗或者達到了CPU核心數,重新生成probe,並重試
            h = advanceProbe(h);
        }
        // 未初始化過cells陣列,嘗試佔有鎖並初始化cells陣列
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 是否初始化成功
            boolean init = false;
            try {                           // Initialize table
                // 檢測是否有其它執行緒初始化過
                if (cells == as) {
                    // 新建一個大小為2的Cell陣列
                    Cell[] rs = new Cell[2];
                    // 找到當前執行緒hash到陣列中的位置並建立其對應的Cell
                    rs[h & 1] = new Cell(x);
                    // 賦值給cells陣列
                    cells = rs;
                    // 初始化成功
                    init = true;
                }
            } finally {
                // 釋放鎖
                cellsBusy = 0;
            }
            // 初始化成功直接返回
            // 因為增加的值已經同時建立到Cell中了
            if (init)
                break;
        }
        // 如果有其它執行緒在初始化cells陣列中,就嘗試更新base
        // 如果成功了就返回
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

(1)如果cells陣列未初始化,當前執行緒會嘗試佔有cellsBusy鎖並建立cells陣列;

(2)如果當前執行緒嘗試建立cells陣列時,發現有其它執行緒已經在建立了,就嘗試更新base,如果成功就返回;

(3)通過執行緒的probe值找到當前執行緒應該更新cells陣列中的哪個Cell;

(4)如果當前執行緒所在的Cell未初始化,就佔有佔有cellsBusy鎖並在相應的位置建立一個Cell;

(5)嘗試CAS更新當前執行緒所在的Cell,如果成功就返回,如果失敗說明出現衝突;

(5)當前執行緒更新Cell失敗後並不是立即擴容,而是嘗試更新probe值後再重試一次;

(6)如果在重試的時候還是更新失敗,就擴容;

(7)擴容時當前執行緒佔有cellsBusy鎖,並把陣列容量擴大到兩倍,再遷移原cells陣列中元素到新陣列中;

(8)cellsBusy在建立cells陣列、建立Cell、擴容cells陣列三個地方用到;

sum()方法

sum()方法是獲取LongAdder中真正儲存的值的大小,通過把base和所有段相加得到。

public long sum() {
    Cell[] as = cells; Cell a;
    // sum初始等於base
    long sum = base;
    // 如果cells不為空
    if (as != null) {
        // 遍歷所有的Cell
        for (int i = 0; i < as.length; ++i) {
            // 如果所在的Cell不為空,就把它的value累加到sum中
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    // 返回sum
    return sum;
}

可以看到sum()方法是把base和所有段的值相加得到,那麼,這裡有一個問題,如果前面已經累加到sum上的Cell的value有修改,不是就沒法計算到了麼?

答案確實如此,所以LongAdder可以說不是強一致性的,它是最終一致性的。

LongAdder VS AtomicLong

直接上程式碼:

public class LongAdderVSAtomicLongTest {
    public static void main(String[] args){
        testAtomicLongVSLongAdder(1, 10000000);
        testAtomicLongVSLongAdder(10, 10000000);
        testAtomicLongVSLongAdder(20, 10000000);
        testAtomicLongVSLongAdder(40, 10000000);
        testAtomicLongVSLongAdder(80, 10000000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times){
        try {
            System.out.println("threadCount:" + threadCount + ", times:" + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    atomicLong.incrementAndGet();
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    longAdder.add(1);
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }
}

執行結果如下:

threadCount:1, times:10000000
LongAdder elapse:158ms
AtomicLong elapse:64ms
threadCount:10, times:10000000
LongAdder elapse:206ms
AtomicLong elapse:2449ms
threadCount:20, times:10000000
LongAdder elapse:429ms
AtomicLong elapse:5142ms
threadCount:40, times:10000000
LongAdder elapse:840ms
AtomicLong elapse:10506ms
threadCount:80, times:10000000
LongAdder elapse:1369ms
AtomicLong elapse:20482ms

可以看到當只有一個執行緒的時候,AtomicLong反而效能更高,隨著執行緒越來越多,AtomicLong的效能急劇下降,而LongAdder的效能影響很小。

總結

(1)LongAdder通過base和cells陣列來儲存值;

(2)不同的執行緒會hash到不同的cell上去更新,減少了競爭;

(3)LongAdder的效能非常高,最終會達到一種無競爭的狀態;

彩蛋

在longAccumulate()方法中有個條件是n >= NCPU就不會走到擴容邏輯了,而n是2的倍數,那是不是代表cells陣列最大隻能達到大於等於NCPU的最小2次方?

答案是明確的。因為同一個CPU核心同時只會執行一個執行緒,而更新失敗了說明有兩個不同的核心更新了同一個Cell,這時會重新設定更新失敗的那個執行緒的probe值,這樣下一次它所在的Cell很大概率會發生改變,如果執行的時間足夠長,最終會出現同一個核心的所有執行緒都會hash到同一個Cell(大概率,但不一定全在一個Cell上)上去更新,所以,這裡cells陣列中長度並不需要太長,達到CPU核心數足夠了。

比如,筆者的電腦是8核的,所以這裡cells的陣列最大隻會到8,達到8就不會擴容了。


歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

相關推薦

java發包LongAdder原始碼分析

問題 (1)java8中為什麼要新增LongAdder? (2)LongAdder的實現方式? (3)LongAdder與AtomicLong的對比? 簡介 LongAdder是java8中新增的原子類,在多執行緒環境中,它比AtomicLong效能要高出不少,特別是寫多的場景。 它是怎麼實現的呢?讓我們一起

java發包AtomicStampedReference原始碼分析(ABA問題詳解)

問題 (1)什麼是ABA? (2)ABA的危害? (3)ABA的解決方法? (4)AtomicStampedReference是什麼? (5)AtomicStampedReference是怎麼解決ABA的? 簡介 AtomicStampedReference是java併發包下提供的一個原子類,它能解決其它原子

java發包LongAdder源碼分析

ica sys offset ktr 遷移 對比 .get unsafe join() 問題 (1)java8中為什麽要新增LongAdder? (2)LongAdder的實現方式? (3)LongAdder與AtomicLong的對比? 簡介 LongAdder是java

java同步系列ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

問題 (1)重入鎖是什麼? (2)ReentrantLock如何實現重入鎖? (3)ReentrantLock為什麼預設是非公平模式? (4)ReentrantLock除了可重入還有哪些特性? 簡介 Reentrant = Re + entrant,Re是重複、又、再的意思,entrant是enter的名詞或

java同步系列ReentrantLock原始碼解析(二)——條件鎖

問題 (1)條件鎖是什麼? (2)條件鎖適用於什麼場景? (3)條件鎖的await()是在其它執行緒signal()的時候喚醒的嗎? 簡介 條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。 比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素

java同步系列ReentrantReadWriteLock原始碼解析

問題 (1)讀寫鎖是什麼? (2)讀寫鎖具有哪些特性? (3)ReentrantReadWriteLock是怎麼實現讀寫鎖的? (4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap? 簡介 讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個執行緒可以同時對共享

java同步系列Semaphore原始碼解析

問題 (1)Semaphore是什麼? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什麼場景中? (

java同步系列StampedLock原始碼解析

問題 (1)StampedLock是什麼? (2)StampedLock具有什麼特性? (3)StampedLock是否支援可重入

java同步系列CyclicBarrier原始碼解析——有圖有真相

問題 (1)CyclicBarrier是什麼? (2)CyclicBarrier具有什麼特性? (3)CyclicBarrier與

java同步系列Phaser原始碼解析

問題 (1)Phaser是什麼? (2)Phaser具有哪些特性? (3)Phaser相對於CyclicBarrier和Count

深入理解Java發包ConcurrentHashMap

【宣告】本部落格大部分內容來自公眾號ImportNew HashMap的容量由負載因子決定,插入的元素超過了容量的範圍就會觸發擴容操作,就是rehash。 在多執行緒環境下,若同時存在其他元素進行put操作,如果hash值相同,可能出現在同一陣列下用連結串列

Java發包Lock鎖和Condition條件

一、Synchronized synchronized是Java的一個關鍵字,也就是Java語言內建的特性,如果一個程式碼塊被synchronized修飾了,當一個執行緒獲取了對應的鎖,執行程式碼塊時,其他執行緒便只能一直等待,等待獲取鎖的執行緒釋放鎖,而獲取

java魔法類Unsafe解析

cat 序列化 簡介 分析 img 內部 基本功 圖片 resource 問題 (1)Unsafe是什麽? (2)Unsafe只有CAS的功能嗎? (3)Unsafe為什麽是不安全的? (4)怎麽使用Unsafe? 簡介 本章是java並發包專題的第一章,但是第一篇寫的卻不

java原子類終結篇(面試題)

static ets new 點擊 比較 原子操作 地址 累加 turn 概覽 原子操作是指不會被線程調度機制打斷的操作,這種操作一旦開始,就一直運行到結束,中間不會有任何線程上下文切換。 原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序不可以被打亂,也不可以被切割

java同步系列開篇

討論 關註 使用 避免死鎖 更新數據 讀寫 上下文切換 monit 缺點 簡介 同步系列,這是彤哥想了好久的名字,本來是準備寫鎖相關的內容,但是java中的CountDownLatch、Semaphore、CyclicBarrier這些類又不屬於鎖,它們和鎖又有很多共同點,

java同步系列JMM(Java Memory Model)

簡介 Java記憶體模型是在硬體記憶體模型上的更高層的抽象,它遮蔽了各種硬體和作業系統訪問的差異性,保證了Java程式在各種平臺下對記憶體的訪問都能達到一致的效果。 硬體記憶體模型 在正式講解Java的記憶體模型之前,我們有必要先了解一下硬體層面的一些東西。 在現代計算機的硬體體系中,CPU的運算速度是非常快

java同步系列volatile解析

問題 (1)volatile是如何保證可見性的? (2)volatile是如何禁止重排序的? (3)volatile的實現原理? (4)volatile的缺陷? 簡介 volatile可以說是Java虛擬機器提供的最輕量級的同步機制了,但是它並不容易被正確地理解,以至於很多人不習慣使用它,遇到多執行緒問題一律

java同步系列synchronized解析

問題 (1)synchronized的特性? (2)synchronized的實現原理? (3)synchronized是否可重入? (4)synchronized是否是公平鎖? (5)synchronized的優化? (6)synchronized的五種使用方式? 簡介 synchronized關鍵字是Ja

java同步系列自己動手寫一個鎖Lock

問題 (1)自己動手寫一個鎖需要哪些知識? (2)自己動手寫一個鎖到底有多簡單? (3)自己能不能寫出來一個完美的鎖? 簡介 本篇文章的目標一是自己動手寫一個鎖,這個鎖的功能很簡單,能進行正常的加鎖、解鎖操作。 本篇文章的目標二是通過自己動手寫一個鎖,能更好地理解後面章節將要學習的AQS及各種同步器實現的原理