1. 程式人生 > >Java併發學習(十一)-LongAdder和LongAccumulator探究

Java併發學習(十一)-LongAdder和LongAccumulator探究

Java8在atomic包新增了5個類,分別是Striped64LongAdderLongAccumulatorDoubleAdderDoubleAccumulator。其中,Sriped64作為父類,其他分別是long和double的具體實現。
下面首先從父類Striped64這個類開始講,其幾個類都是遵從它的結構進行實現的。

What is Striped64

Striped64,就向一個AtomicLong,裡面維持一個volatile的base,還有一個cell陣列,cell陣列主要是儲存執行緒需要增加或減少的值,它能夠將競爭的執行緒分散到自己內部的私有cell數組裡面,所以當併發量很大的時候,執行緒會被部分分發去訪問內部的cell陣列。這裡先看程式碼結構:

偽共享

在學習Striped64程式碼時候,遇到了一個新的知識點,就是偽共享(False Sharing):

@sun.misc.Contended static final class Cell

Cell的函式頭定義是這樣的,加了:@sun.misc.Contended 這個有什麼用呢?
在cpu執行指令時,通常會把指令載入到緩衝行(cache line)中,cpu的快取系統中是以快取行(cache line)為單位儲存的,快取行是2的整數冪個連續位元組,一般為32-256個位元組。最常見的快取行大小是64個位元組,cache line是cache和memory之間資料傳輸的最小單元。但是由於系統可能會把兩個不同的cpu處理的指令放到一起。
也就是,著一個cache line,既有cpu1所需執行的指令,又有cpu2所需執行的指令。


這樣一來,當出現競爭時,如果cpu1獲得了所有權,快取子系統將會使核心2中對應的快取行失效。當cpu2獲得了所有權然後執行更新操作,核心1就要使自己對應的快取行失效。這會來來回回的經過L3快取,大大影響了效能。如果互相競爭的核心位於不同的插槽,就要額外橫跨插槽連線,問題可能更加嚴重。

這個問題在Java7中,是通過填充long欄位來實現cache line的獨立性,,也就是本cache line裡面就只有我的變數:

public class VolatileLongPadding {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile
long v = 0L; volatile long q0, q1, q2, q3, q4, q5, q6; }

所以,加入@sun.misc.Contended 就是為了解決這個問題,讓當前執行緒執行操作變數處於一個獨立的cache line裡面。

Striped64程式碼結構

@SuppressWarnings("serial")
abstract class Striped64 extends Number {
    /**
     * 用@sun.misc.Contended來杜絕為共享。用來儲存衝突時需要增加的格子。cell
     * CAS方式。
     */
    @sun.misc.Contended
    static final class Cell {
        volatile long value;

        Cell(long x) {
            value = x;
        }

        //CAS操作
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /** Number of CPUS, to place bound on table size
     * cpu的個數,繫結的table。 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    //cells陣列,大小為2的倍數
    transient volatile Cell[] cells;

    //基礎的值。不衝突下直接在base上增加,通過CAS更改。
    transient volatile long base;

    //判斷cells是否有執行緒在使用的變數,通過CAS去鎖定。
    transient volatile int cellsBusy;
    ...

上述程式碼主要變數:
- cells:用於儲存衝突的執行緒
- base:基礎的數值,當沒有衝突或衝突很少時,就會在base上操作,而不用加入cell,也就是AtomicLong的原理
- cellsBusy:判斷cells是否被一個執行緒使用了,如果一個執行緒使用,就不自旋一次換個probe來進行。

Striped64裡面有兩個主要的方法:longAccumulatedoubleAccumulate ,兩個方法方法實現幾乎一模一樣,這裡主要以longAccumulate為例分析:

先說說主要思路,如果能夠通過CAS修改base成功,那麼直接退出(併發裡量不大),否則去cells裡面佔一個非空的坑(併發量大),並把要操作的值賦值儲存在一個Cell裡面。

所以這樣,只能去尋找沒有值的cell,如果有值,就說明以前有執行緒也是由於競爭base失敗從而來cells這裡報個到,到時候當去總數時才能加自己。

接下來看具體實現程式碼:

    /**
     * 裡面可以重新改變table大小,或者建立新的cells。
     * @param x 增加的long值
     * @param fn 函數語言程式設計,代表一個一個待執行操作的函式
     * @param wasUncontended
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            //如果當前執行緒沒有初始化,就初始化下當前執行緒。
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false; // True if last slot nonempty
        for (;;) {
            Cell[] as;
            Cell a;
            int n;
            long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //cell有值。
                if ((a = as[(n - 1) & h]) == null) {
                    //進入這個方法,就說明這個位置沒執行緒,所以你可以進來。進來後再看能不能獲取到cell鎖。
                    if (cellsBusy == 0) { // Try to attach new Cell
                        //新建一個cell,並且嘗試加進去
                        Cell r = new Cell(x); // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try { // Recheck under lock
                                Cell[] rs;
                                int m, j;
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue; // Slot is now non-empty
                        }
                    }
                    collide = false;  
                } else if (!wasUncontended) // CAS already known to fail
                    wasUncontended = true; // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false; // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //擴容操作。
                        if (cells == as) { // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue; // Retry with expanded table
                }
                h = advanceProbe(h);
            } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                //這是cell初始化的過程
                //直接修改base不成功,所以來修改cells做文章。
                //cell為null,但是cellsBusy=0,但是有,加入一個cell中。
                boolean init = false;
                try { // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];             //最開始cells的大小為2
                        rs[h & 1] = new Cell(x);             //給要增加的x,做一個cell的坑。
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //釋放鎖
                    cellsBusy = 0;
                }
                if (init)
                    break;
            } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))

                //cell為null並且cellsBusy為1,也就是說,現在有人用cells,我就去嘗試更新base吧,接用CAS這個base來實現
                break; // Fall back on using base
        }
    }

在doubleAccumulate中,基本程式碼與longAccumulate一致,但是,Cell裡面定義的value是long型別的,所以程式碼中進行了如下轉化,把double儲存在long中:

Cell r = new Cell(Double.doubleToRawLongBits(x))

接下來看兩組具體實現的子類。

LongAdder和LongAccumulator

按照Doug Lea的描述,LongAdder效能優於AtomicLong,從上面分析可以知道,當併發量不高時,其實二者效能差不多,但是一旦併發量超過一定限度,毫無疑問CAS會失敗,對於AtomicLong則沒有其他解決方法,而LongAdder則可以通過cells陣列來進行部分的“分流”操作。

進入正題,說說LongAdder,首先看看它裡面方法:

  • public void add(long x):增加x
  • public void increment() :自增
  • public void decrement() :自減
  • public long sum() :求和
  • public void reset():重置cell陣列
  • public long sumThenReset():求和並重置

這裡主要看看add方法,其他都好理解:

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //如果cells不為null,或者CAS base變數失敗,說明衝突了,
        //置uncontended為true
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //當as為null,或as的長度小於等於1,或a為null,,

                //判斷符合呼叫父類方法進行相加操作
                longAccumulate(x, null, uncontended);
        }
    }

思路還是很簡單的,而對於LongAccumulator,則是支援各種自定義運算的long運算。因為傳入了一個方法

    public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }

程式設計師可以自己定義二元的LongBinaryOperator並進行運算。LongAdder是相加,則LongAccumulator則是進行自己定義的操作。

DoubleAdder和DoubleAccumulator

對於DoubleAdderDoubleAccumulator,則是將其轉化為long型別後進行運算的:

//long轉為double
public static native double longBitsToDouble(long bits);
//double轉為long
public static native long doubleToRawLongBits(double value);

具體實現思路與long型別實現一致,不再贅述。