歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列連結

  1. 快速入門
  2. Disruptor類分析
  3. 環形佇列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

本文是《disruptor筆記》的第七篇,咱們一起閱讀原始碼,學習一個重要的知識點:等待策略,由於Disruptor的原始碼短小精幹、簡單易懂,因此本篇是個輕鬆愉快的原始碼學習之旅;

提前小結

如果您時間不充裕,可以通過以下提前小結的內容,對等待策略有個大體的認識:

  1. BlockingWaitStrategy:用了ReentrantLock的等待&&喚醒機制實現等待邏輯,是預設策略,比較節省CPU
  2. BusySpinWaitStrategy:持續自旋,JDK9之下慎用(最好別用)
  3. DummyWaitStrategy:返回的Sequence值為0,正常環境是用不上的
  4. LiteBlockingWaitStrategy:基於BlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作,但是作者說測試不充分,不建議使用
  5. TimeoutBlockingWaitStrategy:帶超時的等待,超時後會執行業務指定的處理邏輯
  6. LiteTimeoutBlockingWaitStrategy:基於TimeoutBlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作
  7. SleepingWaitStrategy:三段式,第一階段自旋,第二階段執行Thread.yield交出CPU,第三階段睡眠執行時間,反覆的的睡眠
  8. YieldingWaitStrategy:二段式,第一階段自旋,第二階段執行Thread.yield交出CPU
  9. PhasedBackoffWaitStrategy:四段式,第一階段自旋指定次數,第二階段自旋指定時間,第三階段執行Thread.yield交出CPU,第四階段呼叫成員變數的waitFor方法,這個成員變數可以被設定為BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個

關於等待策略

  • 回顧一下前面的文章中例項化Disruptor的程式碼:
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"));
  • 展開上述構造方法,會見到建立RingBuffer的程式碼,預設使用了BlockingWaitStrategy作為等待策略:
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
  • 繼續展開上面的createMultiProducer方法,可見每個Sequencer(注意不是Sequence)都有自己的watStrategy成員變數:

  • 這個waitStrategy的最終用途是建立SequenceBarrier的時候,傳給SequenceBarrier做成員變數:

  • 在看看SequenceBarrier是如何使用waitStrategy的,一共兩處用到,第一處如下圖紅框,原來是waitFor方法內部會用到,這個waitFor咱們前面已經瞭解過,對消費者來說,等待環形佇列的指定位置有可用資料時,就是呼叫SequenceBarrier的waitFor完成的:

  • SequenceBarrier第二處用到waitStrategy是喚醒的時候:
    @Override
public void alert()
{
alerted = true;
waitStrategy.signalAllWhenBlocking();
}
  • 現在咱們知道了WaitStrategy的使用場景,接下來看看這個介面有哪些具體實現吧,這樣咱們在程式設計中就知道如何選擇才最適合自己

BlockingWaitStrategy

  • 作為預設的等待策略,BlockingWaitStrategy還有個特點就是程式碼量小(不到百行),很容易理解,其實就是用ReentrantLock+Condition來實現等待和喚醒操作的,如下圖紅框:

  • 如果您更傾向於節省CPU資源,對高吞吐量和低延時的要求相對低一些,那麼BlockingWaitStrategy就適合您了;

BusySpinWaitStrategy(慎用)

  • 前面的BlockingWaitStrategy有個特點,就是一旦環形佇列指定位置來了資料,由於執行緒是等待狀態(底層呼叫了native的UNSAFE.park方法),因此還要喚醒後才能執行業務邏輯,在一些場景中希望資料一到就儘快消費,此時BusySpinWaitStrategy就很合適了,程式碼太簡單,全部貼出:
public final class BusySpinWaitStrategy implements WaitStrategy
{
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
} return availableSequence;
} @Override
public void signalAllWhenBlocking()
{
}
}
  • 上述程式碼顯示,整個while迴圈的關鍵就是ThreadHints.onSpinWait做了什麼,原始碼如下,這裡要格外注意,如果ON_SPIN_WAIT_METHOD_HANDLE為空,意味著外面的while迴圈是個非常消耗CPU的自旋
    public static void onSpinWait()
{
if (null != ON_SPIN_WAIT_METHOD_HANDLE)
{
try
{
ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
}
catch (final Throwable ignore)
{
}
}
}
  • ON_SPIN_WAIT_METHOD_HANDLE為空是很可怕的事情,咱們來看看它是何方神聖?程式碼還是在ThreadHints.java中,如下所示,真相一目瞭然,它就是Thread類的onSpinWait方法,如果Thread類沒有onSpinWait方法,那麼使用BusySpinWaitStrategy作為等待策略就有很高的代價了,環形佇列裡沒有資料時消費執行緒會執行自旋,很耗費CPU:
static
{
final MethodHandles.Lookup lookup = MethodHandles.lookup(); MethodHandle methodHandle = null;
try
{
methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
}
catch (final Exception ignore)
{
} ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
}
  • 好吧,還剩兩個問題:Thread類有沒有onSpinWait方法還不能確定嗎?這個onSpinWait方法是何方神聖?

  • 去看JDK官方文件,如下圖,原來這方法是從JDK9才有的,所以對於JDK8使用者來說來說,選用BusySpinWaitStrategy就意味著要面對沒做啥事兒的while迴圈了:

  • 第二個問題,onSpinWait方法幹了些啥?前面的官方文件,以欣宸的英語水平顯然是無法理解的,去看stackoverflow吧,如下圖,簡單的說,就是告訴CPU當前執行緒處於迴圈查詢的狀態,CPU得知後就會排程更多CPU資源給其他執行緒:

  • 至此真像大白:環形佇列的條件就緒後,BusySpinWaitStrategy策略是通過whlie死迴圈來做到快速響應的,如果JDK是9或者更高版本,這個死迴圈帶來的CPU損耗由Thread.onSpinWait幫助緩解,如果JDK版本低於9,這裡就是個簡單的while死迴圈,至於這種死迴圈有多消耗CPU,您可以寫段簡單程式碼感受一下...

  • 難怪Disruptor原始碼中會提醒最好是將使用此例項的執行緒繫結到指定CPU核:

DummyWaitStrategy

固定返回0,個人覺得這個策略在正常開發中用不上,因為環形佇列可用位置始終是0的話,不論是生產還是消費都難以實現:

LiteBlockingWaitStrategy

  • 看名字,LiteBlockingWaitStrategy是BlockingWaitStrategy策略的輕量級實現,在鎖沒有競爭的時候(例如獨立消費的場景),會省略掉喚醒操作,不過如下圖紅框所示,作者說他沒有充分驗證過正確性,因此建議只用於體驗,太好了,這個策略我不學了!!!

TimeoutBlockingWaitStrategy

  • 顧名思義,TimeoutBlockingWaitStrategy表示只等待某段時長,超過了就算超時,其程式碼和BlockingWaitStrategy類似,只是等待的時候有個時長限制,如下圖,一目瞭然:

  • 其實我對丟擲異常後的處理很感興趣,去看看吧,外面是熟悉的BatchEventProcessor類,熟悉的processEvents方法,如下圖,每次超時異常都交給notifyTimeout處理,而外部的主流程不受影響,依舊不斷的從環形佇列中等待和獲取資料:

  • 進入notifyTImeout方法,可見實際上是交給成員變數timeoutHandler去處理的,而且處理過程中發生的任何異常都會被捕獲,不會丟擲去影響外部呼叫:

  • 再來看看成員變數是哪來的,如下圖,真相大白,咱們開發的EventHandler實現類,如果也實現了Timeouthandler,就被當做成員變數timeoutHandler了:

  • 至此TimeoutBlockingWaitStrategy也搞清楚了:用於有時間限制的場景,每次等待超時後都會呼叫業務定製的超時處理邏輯,這個邏輯寫到EventHandler實現類中,這個實現類要實現Timeouthandler介面

LiteTimeoutBlockingWaitStrategy

  • LiteTimeoutBlockingWaitStrategy與TimeoutBlockingWaitStrategy的關係,就像BlockingWaitStrategy與LiteBlockingWaitStrategy的關係:作為TimeoutBlockingWaitStrategy的變體,有TimeoutBlockingWaitStrategy的超時處理特性,而且沒有鎖競爭的時候,省略掉喚醒操作;
  • 作者說LiteBlockingWaitStrategy可用於體驗,但正確性並未經過充分驗證,但是在LiteTimeoutBlockingWaitStrategy的註釋中沒有看到這種說法,看樣子這是個靠譜的等待策略,可以用,用在有超時處理的需求,而且沒有鎖競爭的場景(例如獨立消費)

SleepingWaitStrategy

  • 和前面幾個不同的是,SleepingWaitStrategy沒有用到鎖,這意味這無需呼叫signalAllWhenBlocking方法做喚醒處理,相當於省去了生產執行緒的通知操作,官方原始碼註釋有這麼句話引起了我的興趣,如下圖紅框,大意是該策略在效能和CPU資源消耗之間取得了平衡,接下來去看看關鍵程式碼,來了解這個特性:

  • 如下圖,等到可用資料的過程是個死迴圈:

  • 接下來是關鍵程式碼了,如下圖,可見整個等待過程分為三段:計數器高於100時就只有一個減一的操作(最快響應),計數器在100到0之間時每次都交出CPU執行時間(最省資源),其他時候就睡眠固定時間:

YieldingWaitStrategy

  • 看過SleepingWaitStrategy之後,再看YieldingWaitStrategy就很容易理解了,和SleepingWaitStrategy相比,YieldingWaitStrategy先做指定次數的自旋,然後不斷的交出CPU時間:

  • 由於在不斷的執行Thread.yield()方法,因此該策略雖然很消耗CPU,不過一旦其他執行緒有CPU需求,很容易從這個執行緒得到;

PhasedBackoffWaitStrategy

  • 最後是PhasedBackoffWaitStrategy,該策略的特點是將整個等待過程分成下圖的四段,四個方塊代表一個時間線上的四個階段:

  • 這裡說明一下上圖的四個階段:
  1. 首先是自旋指定的次數,預設10000次;
  2. 自旋過後,開始帶計時的自旋,執行的時長是spinTimeoutNanos的值;
  3. 執行時長達到spinTimeoutNanos的值後,開始執行Thread.yield()交出CPU資源,這個邏輯的執行時長是yieldTimeoutNanos-spinTimeoutNanos;
  4. 執行時長達到yieldTimeoutNanos-spinTimeoutNanos的值後,開始呼叫fallbackStrategy.waitFor,這個呼叫沒有時間或者次數限制;
  • 現在問題來了fallbackStrategy是何方神聖?PhasedBackoffWaitStrategy類準備了三個靜態方法,咱們可以按需選用,讓fallbackStrategy是BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個:
public static PhasedBackoffWaitStrategy withLock(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new BlockingWaitStrategy());
} public static PhasedBackoffWaitStrategy withLiteLock(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new LiteBlockingWaitStrategy());
} public static PhasedBackoffWaitStrategy withSleep(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new SleepingWaitStrategy(0));
}
  • 至此,Disruptor的九種等待策略就全部分析完畢了,除了選用等待策略的時候更加得心應手,還有個收穫就是積攢了閱讀優秀原始碼的經驗,在讀原始碼的路上更加有信心了;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...

https://github.com/zq2599/blog_demos