歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列連結

  1. 快速入門
  2. Disruptor類分析
  3. 環形佇列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 通過前文的實戰,咱們對Disruptor有了初步認識,藉助com.lmax.disruptor.dsl.Disruptor類可以輕鬆完成以下操作:
  1. 環形佇列初始化
  2. 指定事件消費者
  3. 啟動消費者執行緒
  • 接下來要面對兩個問題:
  1. 深入瞭解Disruptor類是如何完成上述操作的;
  2. 對Disruptor類有了足夠了解時,嘗試不用Disruptor,自己動手操作環形佇列,實現訊息的生產和消費,這樣做的目的是加深對Disruptor內部的認識,做到知其所以然;
  • 接下來咱們先解決第一個問題吧,結合Disruptor物件的原始碼來看看上述三個操作到底做了什麼;

環形佇列初始化

  • 環形佇列初始化發生在例項化Disruptor物件的時候,即Disruptor的構造方法:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
  • RingBuffer.createMultiProducer方法內部例項化了RingBuffer,如下圖紅框:

  • 記下第一個重要知識點:建立RingBuffer物件;

指定事件消費者

  • 在前文中,下面這行程式碼指定了事件由StringEventHandler消費:
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
  • 檢視handleEventsWith方法的內部:
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
  • 展開createEventProcessors方法,如下圖,請重點關注建立SequenceBarrier和BatchEventProcessor等操作:

  • 展開上圖紅框四中的updateGatingSequencesForNextInChain方法,如下圖,紅框中的ringBuffer.addGatingSequences需要重點關注:

  • 小結一下,disruptor.handleEventsWith方法涉及到四個重要知識點:
  1. 建立SequenceBarrier物件,用於接收ringBuffer中的可消費事件
  2. 建立BatchEventProcessor,負責消費事件
  3. 繫結BatchEventProcessor物件的異常處理類
  4. 呼叫ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer

啟動消費者執行緒

  • 前文已通過日誌確定了消費事件的邏輯是在一個獨立的執行緒中執行的,啟動消費者執行緒的程式碼如下:
disruptor.start();
  • 展開start方法,如下可見,關鍵程式碼是consumerInfo.start(executor):
    public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
} return ringBuffer;
}
  • ConsumerInfo是介面,對應的實現類有EventProcessorInfo和WorkerPoolInfo兩種,這裡應該是哪種呢?既然來源是consumerRepository,這就要看當初是怎麼存入consumerRepository的,前面在分析createEventProcessors方法時,下圖紅框中的consumerRepository.add被忽略了,現在需要進去看看:

  • 進去後一目瞭然,可見ConsumerInfo的實現是EventProcessorInfo:

  • 所以,回到前面對consumerInfo.start(executor)方法的分析,這裡要看的就是EventProcessorInfo的start方法了,如下圖,非常簡單,就是啟動一個執行緒執行eventprocessor(這個eventprocessor是BatchEventProcessor物件):

  • 小結一下,disruptor.start方法涉及到一個重要知識點:
  1. 啟動獨立執行緒,用來執行消費事件的業務邏輯;

消費事件的邏輯

  • 為了理解訊息處理邏輯,還要重點關注BatchEventProcessor.processEvents方法,如下圖所示,其實也很簡單,就是不停的從環形佇列取出可用的事件,然後再更新自己的Sequence,相當於標記已經消費到哪裡了:

總結

最後總結Disruptor類的重要功能:

  1. 建立環形佇列(RingBuffer物件)
  2. 建立SequenceBarrier物件,用於接收ringBuffer中的可消費事件
  3. 建立BatchEventProcessor,負責消費事件
  4. 繫結BatchEventProcessor物件的異常處理類
  5. 呼叫ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
  6. 啟動獨立執行緒,用來執行消費事件的業務邏輯
  • 聰明的您一定會發現,本文並沒有全面分析Disruptor類的原始碼,例如after、shutdown等方法都沒有提到,確實如此,欣宸在此給您道歉了,本篇的重點是找出那些與基本功能有關程式碼,為後面的實戰提供理論指導(不用Disruptor類實現訊息生產消費的實戰),因此很多高階功能都跳過了;

理解官方流程圖

  • 此時再看官方流程圖,聰明的您應該很快就能理解此圖表達的意思:每個消費者都有自己的Sequence,通過此Sequence取得自己在環形佇列中消費的位置,再通過SequenceBarrier來等待可用事件的出現,等到事件出現了就用get方法取出具體的事件,給EventHandler來處理:

後續預告

  • 此時,咱們對Disruptor類已經有了比較深入的理解,接下來的文章,咱們會嘗試不用Disruptor類,僅憑著對RingBuffer物件的操作來實現以下三種功能:
  1. 100個事件,單個消費者消費;
  2. 100個事件,三個消費者,每個都獨自消費這個100個事件;
  3. 100個事件,三個消費者共同消費這個100個事件;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...

https://github.com/zq2599/blog_demos