1. 程式人生 > >高併發資料結構Disruptor解析(2)

高併發資料結構Disruptor解析(2)

Sequence(續)

之前說了Sequence通過給他的核心值value新增前置無用的padding long還有後置無用的padding long來避免對於value操作的false sharing的發生。那麼對於這個value的操作是怎麼操作的呢?
這裡我們需要先了解下Unsafe類這個東西,可以參考我的另一篇文章 - Java Unsafe 類
Unsafe中有一些底層為C++的方法,對於Sequence,其中做了:
獲取Unsafe,通過Unsafe獲取Sequence中的value的地址,根據這個地址CAS更新。
com.lmax.disruptor.Sequence.java

public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"
)); } catch (final Exception e) { throw new RuntimeException(e); } } /** * 預設初始value為-1 */ public Sequence() { this(INITIAL_VALUE); } public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this
, VALUE_OFFSET, initialValue); } public long get() { return value; } /** * 利用Unsafe更新value的地址記憶體上的值從而更新value的值 */ public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } /** * 利用Unsafe原子更新value */ public void setVolatile(final long value) { UNSAFE.putLongVolatile(this, VALUE_OFFSET, value); } /** * 利用Unsafe CAS */ public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); } public long incrementAndGet() { return addAndGet(1L); } public long addAndGet(final long increment) { long currentValue; long newValue; do { currentValue = get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; } @Override public String toString() { return Long.toString(get()); } }

Producer

SingleProducerSequencer

接下來我們先從Producer看起。Disruptor分為單生產者和多生產者,先來關注下單生產者的核心類SingleProducerSequencer,類結構如下:
這裡寫圖片描述
針對這些介面做一下簡單的描述:
Cursored介面:實現此介面的類,可以理解為,記錄某個sequence的類。例如,生產者在生產訊息時,需要知道當前ringBuffer下一個生產的位置,這個位置需要更新,每次更新,需要訪問getCursor來定位。
Sequenced介面:實現此介面類,可以理解為,實現一個有序的儲存結構,也就是RingBuffer的一個特性。一個Producer,在生產Event時,先獲取下一位置的Sequence,之後填充Event,填充好後再publish,這之後,這個Event就可以被消費處理了

  • getBufferSize獲取ringBuffer的大小
  • hasAvailableCapacity判斷空間是否足夠
  • remainingCapacity獲取ringBuffer的剩餘空間
  • next申請下一個或者n個sequence(value)作為生產event的位置
  • tryNext嘗試申請下一個或者n個sequence(value)作為生產event的位置,容量不足會丟擲InsufficientCapacityException
  • publish釋出Event

Sequencer介面:**Sequencer介面,擴充套件了Cursored和Sequenced介面。在前兩者的基礎上,增加了消費與生產相關的方法。其中一個比較重要的設計是關於**GatingSequence的設計:
之後我們會提到,RingBuffer的頭由一個名字為Cursor的Sequence物件維護,用來協調生產者向RingBuffer中填充資料。表示佇列尾的Sequence並沒有在RingBuffer中,而是由消費者維護。這樣的話,佇列尾的維護就是無鎖的。但是,在生產者方確定RingBuffer是否已滿就需要跟蹤更多資訊。為此,GatingSequence用來跟蹤相關Sequence

  • INITIAL_CURSOR_VALUE: -1 為 sequence的起始值
  • claim: 申請一個特殊的Sequence,只有設定特殊起始值的ringBuffer時才會使用(一般是多個生產者時才會使用)
  • isAvailable:非阻塞,驗證一個sequence是否已經被published並且可以消費
  • addGatingSequences:將這些sequence加入到需要跟蹤處理的gatingSequences中
  • removeGatingSequence:移除某個sequence
  • newBarrier:給定一串需要跟蹤的sequence,建立SequenceBarrier。SequenceBarrier是用來給多消費者確定消費位置是否可以消費用的
  • getMinimumSequence:獲取這個ringBuffer的gatingSequences中最小的一個sequence
  • getHighestPublishedSequence:獲取最高可以讀取的Sequence
  • newPoller:目前沒用,不講EventPoller相關的內容(沒有用到)

之後,抽象類AbstractSequencer實現Sequencer這個介面:定義了5個域:

    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    protected final int bufferSize;
    protected final WaitStrategy waitStrategy;
    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    protected volatile Sequence[] gatingSequences = new Sequence[0];
  • SEQUENCE_UPDATER 是用來原子更新gatingSequences 的工具類
  • bufferSize記錄生產目標RingBuffer的大小
  • waitStrategy表示這個生產者的等待策略(之後會講)
  • cursor:生產定位,初始為-1
  • gatingSequences :前文已講

構造方法增加了一些對於這個類的限制:

public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
    if (bufferSize < 1)
    {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.bufferSize = bufferSize;
    this.waitStrategy = waitStrategy;
}

bufferSize不能小於1並且bufferSize必須是2的n次方。原因我的第一篇文章已經講述。
對於getCursor和getBufferSize的實現,這裡僅僅是簡單的getter:

@Override
public final long getCursor()
{
    return cursor.get();
}

@Override
public final int getBufferSize()
{
    return bufferSize;
}

對於addGatingSequences和removeGatingSequence,則是原子更新:

public final void addGatingSequences(Sequence... gatingSequences)
{
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

public boolean removeGatingSequence(Sequence sequence)
{
    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}

原子更新工具類靜態方法程式碼:

/**
 * 原子新增sequences
 * 
 * @param holder 原子更新的域所屬的類物件
 * @param updater 原子更新的域物件
 * @param cursor 定位
 * @param sequencesToAdd 要新增的sequences
 * @param <T>
 */
static <T> void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
{
    long cursorSequence;
    Sequence[] updatedSequences;
    Sequence[] currentSequences;
    //在更新成功之前,一直重新讀取currentSequences,擴充為新增所有sequence之後的updatedSequences
    do
    {
        currentSequences = updater.get(holder);
        updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
        cursorSequence = cursor.getCursor();

        int index = currentSequences.length;
        //將新的sequences的值設定為cursorSequence
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
            updatedSequences[index++] = sequence;
        }
    }
    while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

    cursorSequence = cursor.getCursor();
    for (Sequence sequence : sequencesToAdd)
    {
        sequence.set(cursorSequence);
    }
}

/**
 * 原子移除某個指定的sequence
 * 
 * @param holder 原子更新的域所屬的類物件
 * @param sequenceUpdater 原子更新的域物件
 * @param sequence 要移除的sequence
 * @param <T>
 * @return
 */
static <T> boolean removeSequence(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
        final Sequence sequence)
{
    int numToRemove;
    Sequence[] oldSequences;
    Sequence[] newSequences;

    do
    {
        oldSequences = sequenceUpdater.get(holder);

        numToRemove = countMatching(oldSequences, sequence);

        if (0 == numToRemove)
        {
            break;
        }

        final int oldSize = oldSequences.length;
        newSequences = new Sequence[oldSize - numToRemove];

        for (int i = 0, pos = 0; i < oldSize; i++)
        {
            final Sequence testSequence = oldSequences[i];
            if (sequence != testSequence)
            {
                newSequences[pos++] = testSequence;
            }
        }
    }
    while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));

    return numToRemove != 0;
}

private static <T> int countMatching(T[] values, final T toMatch)
{
    int numToRemove = 0;
    for (T value : values)
    {
        if (value == toMatch) // Specifically uses identity
        {
            numToRemove++;
        }
    }
    return numToRemove;
}

對於newBarrier,返回的是一個ProcessingSequenceBarrier:
SequenceBarrier我們之後會詳講,這裡我們可以理解為用來協調消費者消費的物件。例如消費者A依賴於消費者B,就是消費者A一定要後於消費者B消費,也就是A只能消費B消費過的,也就是A的sequence一定要小於B的。這個Sequence的協調,通過A和B設定在同一個SequenceBarrier上實現。同時,我們還要保證所有的消費者只能消費被Publish過的。這裡我們先不深入

public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}

之後到了我們這次的核心,SingleProducerSequencer,觀察它的結構,他依然利用了long冗餘避免CPU的false sharing,這次的field不只有一個,而是有兩個,所以,前後放上7個long型別,這樣在最壞的情況下也能避免false sharing(參考我的第一篇文章
這兩個field是:

protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;

初始值都為-1,這裡強調下,由於這個類並沒有實現任何的Barrier,所以在Disruptor框架中,這個類並不是執行緒安全的。不過由於從命名上看,就是單一生產者,所以在使用的時候也不會用多執行緒去呼叫裡面的方法。
之後就是對AbstractSequencer抽象方法的實現:
hasAvailableCapacity判斷空間是否足夠:

@Override
public boolean hasAvailableCapacity(int requiredCapacity) {
    //下一個生產Sequence位置
    long nextValue = this.nextValue;
    //下一位置加上所需容量減去整個bufferSize,如果為正數,那證明至少轉了一圈,則需要檢查gatingSequences(由消費者更新裡面的Sequence值)以保證不覆蓋還未被消費的
    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    //Disruptor經常用快取,這裡快取之間所有gatingSequences最小的那個,這樣不用每次都遍歷一遍gatingSequences,影響效率
    long cachedGatingSequence = this.cachedValue;
    //只要wrapPoint大於快取的所有gatingSequences最小的那個,就重新檢查更新快取
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        this.cachedValue = minSequence;
        //空間不足返回false
        if (wrapPoint > minSequence)
        {
            return false;
        }
    }
    //若wrapPoint小於快取的所有gatingSequences最小的那個,證明可以放心生產
    return true;
}

對於next方法:申請下一個或者n個sequence(value)作為生產event的位置

@Override
public long next() {
    return next(1);
}

@Override
public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;
    //next方法和之前的hasAvailableCapacity同理,只不過這裡是相當於阻塞的
    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        //只要wrapPoint大於最小的gatingSequences,那麼不斷喚醒消費者去消費,並利用LockSupport讓出CPU,直到wrapPoint不大於最小的gatingSequences
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            waitStrategy.signalAllWhenBlocking();
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        //同理,快取最小的gatingSequences
        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;

    return nextSequence;
}

tryNext嘗試申請下一個或者n個sequence(value)作為生產event的位置,容量不足會丟擲InsufficientCapacityException。而這裡的容量檢查,就是通過之前的hasAvailableCapacity方法檢查:

    @Override
    public long tryNext() throws InsufficientCapacityException {
        return tryNext(1);
    }

    @Override
    public long tryNext(int n) throws InsufficientCapacityException {
        if (n < 1) {
            throw new IllegalArgumentException("n must be > 0");
        }

        if (!hasAvailableCapacity(n)) {
            throw InsufficientCapacityException.INSTANCE;
        }

        long nextSequence = this.nextValue += n;

        return nextSequence;
    }

publish釋出Event:

    @Override
    public void publish(long sequence) {
        //cursor代表可以消費的sequence
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

    @Override
    public void publish(long lo, long hi) {
        publish(hi);
    }

其他:

    @Override
    public void claim(long sequence) {
        nextValue = sequence;
    }

    @Override
    public boolean isAvailable(long sequence) {
        return sequence <= cursor.get();
    }

    @Override
    public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
        return availableSequence;
    }

下面,我們針對SingleProducerSequencer畫一個簡單的工作流程:
假設有如下RingBuffer和SingleProducerSequencer,以及對應的消費者輔助類SequenceBarrier,這裡不畫消費者,假設有不斷通過SequenceBarrier消費的消費者。SingleProducerSequencer的gatingSequences陣列內儲存這一個指向某個Sequence的引用,同時這個Sequence也會被SequenceBarrier更新以表示消費者消費到哪裡了。這裡生產的Sequence還有消費的Sequence都是從零開始不斷增長的,即使大於BufferSize,也可以通過sequence的值對BufferSize取模定位到RingBuffer上。
這裡寫圖片描述
假設SingleProducerSequencer這時生產兩個Event,要放入RingBuffer。則假設先呼叫hasAvailableCapacity(2)判斷下。程式碼流程是:
wrapPoint = (nextValue + requiredCapacity) - bufferSize = (-1 + 2) - 4 = -3
-3 < cachedValue所以不用檢查gateSequences直接返回true。假設返回true,就開始填充,之後呼叫publish更新cursor,這樣消費者呼叫isAvailable根據Cursor就可以判斷,sequence:0和sequence:1可以消費了。
這裡寫圖片描述
假設這之後,消費者消費了一個Event,更新Sequence為0.
這裡寫圖片描述
之後,生產者要生產四個Event,呼叫hasAvailableCapacity(4)檢查。程式碼流程是:
wrapPoint = (nextValue + requiredCapacity) - bufferSize = (1 + 4) - 4 = 1
1 > cachedValue所以要重新檢查,這是最小的Sequence是0,但是1 > 仍然大於最小的Sequence,所以更新cachedValue,返回false。
這裡寫圖片描述
至此,展示了一個簡單的生產過程,SingleProducerSequencer也就講完啦。