高併發資料結構Disruptor解析(2)
Sequence(續)
之前說了Sequence通過給他的核心值value新增前置無用的padding long還有後置無用的padding long來避免對於value操作的false sharing的發生。那麼對於這個value的操作是怎麼操作的呢?
這裡我們需要先了解下Unsafe類這個東西,可以參考我的另一篇文章 - Java Unsafe 類。
Unsafe中有一些底層為C++的方法,對於Sequence,其中做了:
獲取Unsafe,通過Unsafe獲取Sequence中的value的地址,根據這個地址CAS更新。
com.lmax.disruptor.Sequence.java
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value" ));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
/**
* 預設初始value為-1
*/
public Sequence()
{
this(INITIAL_VALUE);
}
public Sequence(final long initialValue)
{
UNSAFE.putOrderedLong(this , VALUE_OFFSET, initialValue);
}
public long get()
{
return value;
}
/**
* 利用Unsafe更新value的地址記憶體上的值從而更新value的值
*/
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* 利用Unsafe原子更新value
*/
public void setVolatile(final long value)
{
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
/**
* 利用Unsafe CAS
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
public long incrementAndGet()
{
return addAndGet(1L);
}
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString()
{
return Long.toString(get());
}
}
Producer
SingleProducerSequencer
接下來我們先從Producer看起。Disruptor分為單生產者和多生產者,先來關注下單生產者的核心類SingleProducerSequencer,類結構如下:
針對這些介面做一下簡單的描述:
Cursored介面:實現此介面的類,可以理解為,記錄某個sequence的類。例如,生產者在生產訊息時,需要知道當前ringBuffer下一個生產的位置,這個位置需要更新,每次更新,需要訪問getCursor來定位。
Sequenced介面:實現此介面類,可以理解為,實現一個有序的儲存結構,也就是RingBuffer的一個特性。一個Producer,在生產Event時,先獲取下一位置的Sequence,之後填充Event,填充好後再publish,這之後,這個Event就可以被消費處理了
- getBufferSize獲取ringBuffer的大小
- hasAvailableCapacity判斷空間是否足夠
- remainingCapacity獲取ringBuffer的剩餘空間
- next申請下一個或者n個sequence(value)作為生產event的位置
- tryNext嘗試申請下一個或者n個sequence(value)作為生產event的位置,容量不足會丟擲InsufficientCapacityException
- publish釋出Event
Sequencer介面:**Sequencer介面,擴充套件了Cursored和Sequenced介面。在前兩者的基礎上,增加了消費與生產相關的方法。其中一個比較重要的設計是關於**GatingSequence的設計:
之後我們會提到,RingBuffer的頭由一個名字為Cursor的Sequence物件維護,用來協調生產者向RingBuffer中填充資料。表示佇列尾的Sequence並沒有在RingBuffer中,而是由消費者維護。這樣的話,佇列尾的維護就是無鎖的。但是,在生產者方確定RingBuffer是否已滿就需要跟蹤更多資訊。為此,GatingSequence用來跟蹤相關Sequence
- INITIAL_CURSOR_VALUE: -1 為 sequence的起始值
- claim: 申請一個特殊的Sequence,只有設定特殊起始值的ringBuffer時才會使用(一般是多個生產者時才會使用)
- isAvailable:非阻塞,驗證一個sequence是否已經被published並且可以消費
- addGatingSequences:將這些sequence加入到需要跟蹤處理的gatingSequences中
- removeGatingSequence:移除某個sequence
- newBarrier:給定一串需要跟蹤的sequence,建立SequenceBarrier。SequenceBarrier是用來給多消費者確定消費位置是否可以消費用的
- getMinimumSequence:獲取這個ringBuffer的gatingSequences中最小的一個sequence
- getHighestPublishedSequence:獲取最高可以讀取的Sequence
- newPoller:目前沒用,不講EventPoller相關的內容(沒有用到)
之後,抽象類AbstractSequencer實現Sequencer這個介面:定義了5個域:
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
protected final int bufferSize;
protected final WaitStrategy waitStrategy;
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
protected volatile Sequence[] gatingSequences = new Sequence[0];
- SEQUENCE_UPDATER 是用來原子更新gatingSequences 的工具類
- bufferSize記錄生產目標RingBuffer的大小
- waitStrategy表示這個生產者的等待策略(之後會講)
- cursor:生產定位,初始為-1
- gatingSequences :前文已講
構造方法增加了一些對於這個類的限制:
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
bufferSize不能小於1並且bufferSize必須是2的n次方。原因我的第一篇文章已經講述。
對於getCursor和getBufferSize的實現,這裡僅僅是簡單的getter:
@Override
public final long getCursor()
{
return cursor.get();
}
@Override
public final int getBufferSize()
{
return bufferSize;
}
對於addGatingSequences和removeGatingSequence,則是原子更新:
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
public boolean removeGatingSequence(Sequence sequence)
{
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
原子更新工具類靜態方法程式碼:
/**
* 原子新增sequences
*
* @param holder 原子更新的域所屬的類物件
* @param updater 原子更新的域物件
* @param cursor 定位
* @param sequencesToAdd 要新增的sequences
* @param <T>
*/
static <T> void addSequences(
final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
final Cursored cursor,
final Sequence... sequencesToAdd)
{
long cursorSequence;
Sequence[] updatedSequences;
Sequence[] currentSequences;
//在更新成功之前,一直重新讀取currentSequences,擴充為新增所有sequence之後的updatedSequences
do
{
currentSequences = updater.get(holder);
updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
cursorSequence = cursor.getCursor();
int index = currentSequences.length;
//將新的sequences的值設定為cursorSequence
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
updatedSequences[index++] = sequence;
}
}
while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
cursorSequence = cursor.getCursor();
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
}
}
/**
* 原子移除某個指定的sequence
*
* @param holder 原子更新的域所屬的類物件
* @param sequenceUpdater 原子更新的域物件
* @param sequence 要移除的sequence
* @param <T>
* @return
*/
static <T> boolean removeSequence(
final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
final Sequence sequence)
{
int numToRemove;
Sequence[] oldSequences;
Sequence[] newSequences;
do
{
oldSequences = sequenceUpdater.get(holder);
numToRemove = countMatching(oldSequences, sequence);
if (0 == numToRemove)
{
break;
}
final int oldSize = oldSequences.length;
newSequences = new Sequence[oldSize - numToRemove];
for (int i = 0, pos = 0; i < oldSize; i++)
{
final Sequence testSequence = oldSequences[i];
if (sequence != testSequence)
{
newSequences[pos++] = testSequence;
}
}
}
while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
return numToRemove != 0;
}
private static <T> int countMatching(T[] values, final T toMatch)
{
int numToRemove = 0;
for (T value : values)
{
if (value == toMatch) // Specifically uses identity
{
numToRemove++;
}
}
return numToRemove;
}
對於newBarrier,返回的是一個ProcessingSequenceBarrier:
SequenceBarrier我們之後會詳講,這裡我們可以理解為用來協調消費者消費的物件。例如消費者A依賴於消費者B,就是消費者A一定要後於消費者B消費,也就是A只能消費B消費過的,也就是A的sequence一定要小於B的。這個Sequence的協調,通過A和B設定在同一個SequenceBarrier上實現。同時,我們還要保證所有的消費者只能消費被Publish過的。這裡我們先不深入
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
之後到了我們這次的核心,SingleProducerSequencer,觀察它的結構,他依然利用了long冗餘避免CPU的false sharing,這次的field不只有一個,而是有兩個,所以,前後放上7個long型別,這樣在最壞的情況下也能避免false sharing(參考我的第一篇文章)
這兩個field是:
protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;
初始值都為-1,這裡強調下,由於這個類並沒有實現任何的Barrier,所以在Disruptor框架中,這個類並不是執行緒安全的。不過由於從命名上看,就是單一生產者,所以在使用的時候也不會用多執行緒去呼叫裡面的方法。
之後就是對AbstractSequencer抽象方法的實現:
hasAvailableCapacity判斷空間是否足夠:
@Override
public boolean hasAvailableCapacity(int requiredCapacity) {
//下一個生產Sequence位置
long nextValue = this.nextValue;
//下一位置加上所需容量減去整個bufferSize,如果為正數,那證明至少轉了一圈,則需要檢查gatingSequences(由消費者更新裡面的Sequence值)以保證不覆蓋還未被消費的
long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
//Disruptor經常用快取,這裡快取之間所有gatingSequences最小的那個,這樣不用每次都遍歷一遍gatingSequences,影響效率
long cachedGatingSequence = this.cachedValue;
//只要wrapPoint大於快取的所有gatingSequences最小的那個,就重新檢查更新快取
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
this.cachedValue = minSequence;
//空間不足返回false
if (wrapPoint > minSequence)
{
return false;
}
}
//若wrapPoint小於快取的所有gatingSequences最小的那個,證明可以放心生產
return true;
}
對於next方法:申請下一個或者n個sequence(value)作為生產event的位置
@Override
public long next() {
return next(1);
}
@Override
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
//next方法和之前的hasAvailableCapacity同理,只不過這裡是相當於阻塞的
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
long minSequence;
//只要wrapPoint大於最小的gatingSequences,那麼不斷喚醒消費者去消費,並利用LockSupport讓出CPU,直到wrapPoint不大於最小的gatingSequences
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
//同理,快取最小的gatingSequences
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
tryNext嘗試申請下一個或者n個sequence(value)作為生產event的位置,容量不足會丟擲InsufficientCapacityException。而這裡的容量檢查,就是通過之前的hasAvailableCapacity方法檢查:
@Override
public long tryNext() throws InsufficientCapacityException {
return tryNext(1);
}
@Override
public long tryNext(int n) throws InsufficientCapacityException {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
if (!hasAvailableCapacity(n)) {
throw InsufficientCapacityException.INSTANCE;
}
long nextSequence = this.nextValue += n;
return nextSequence;
}
publish釋出Event:
@Override
public void publish(long sequence) {
//cursor代表可以消費的sequence
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
@Override
public void publish(long lo, long hi) {
publish(hi);
}
其他:
@Override
public void claim(long sequence) {
nextValue = sequence;
}
@Override
public boolean isAvailable(long sequence) {
return sequence <= cursor.get();
}
@Override
public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
return availableSequence;
}
下面,我們針對SingleProducerSequencer畫一個簡單的工作流程:
假設有如下RingBuffer和SingleProducerSequencer,以及對應的消費者輔助類SequenceBarrier,這裡不畫消費者,假設有不斷通過SequenceBarrier消費的消費者。SingleProducerSequencer的gatingSequences陣列內儲存這一個指向某個Sequence的引用,同時這個Sequence也會被SequenceBarrier更新以表示消費者消費到哪裡了。這裡生產的Sequence還有消費的Sequence都是從零開始不斷增長的,即使大於BufferSize,也可以通過sequence的值對BufferSize取模定位到RingBuffer上。
假設SingleProducerSequencer這時生產兩個Event,要放入RingBuffer。則假設先呼叫hasAvailableCapacity(2)判斷下。程式碼流程是:
wrapPoint = (nextValue + requiredCapacity) - bufferSize = (-1 + 2) - 4 = -3
-3 < cachedValue所以不用檢查gateSequences直接返回true。假設返回true,就開始填充,之後呼叫publish更新cursor,這樣消費者呼叫isAvailable根據Cursor就可以判斷,sequence:0和sequence:1可以消費了。
假設這之後,消費者消費了一個Event,更新Sequence為0.
之後,生產者要生產四個Event,呼叫hasAvailableCapacity(4)檢查。程式碼流程是:
wrapPoint = (nextValue + requiredCapacity) - bufferSize = (1 + 4) - 4 = 1
1 > cachedValue所以要重新檢查,這是最小的Sequence是0,但是1 > 仍然大於最小的Sequence,所以更新cachedValue,返回false。
至此,展示了一個簡單的生產過程,SingleProducerSequencer也就講完啦。