1. 程式人生 > >disruptor 高效能之道

disruptor 高效能之道

disruptor是一個高效能的執行緒間非同步通訊的框架,即在同一個JVM程序中的多執行緒間訊息傳遞。應用disruptor知名專案有如下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有很多不少的應用,或者說有一些借鑑了它的設計機制。 下面就跟著筆者一起去領略下disruptor高效能之道吧~

disruptor是一款開源的高效能佇列框架,github地址為 https://github.com/LMAX-Exchange/disruptor

分析disruptor,只要把event的生產和消費流程弄懂,基本上didsruptor的七寸就已經抓住了。話不多說,趕緊上車,筆者以下面程式碼為例講解disruptor:

public static void main(String[] args) {
    Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
            new PrefixThreadFactory("consumer-pool-", new AtomicInteger(0)), ProducerType.MULTI,
            new BlockingWaitStrategy());
 
    // 註冊consumer並啟動
    disruptor.handleEventsWith((EventHandler<StringEvent>) (event, sequence, endOfBatch) -> {
        System.out.println(Util.threadName() 
+ "onEvent " + event); }); disruptor.start(); // publisher邏輯 Executor executor = Executors.newFixedThreadPool(2, new PrefixThreadFactory("publisher-pool-", new AtomicInteger(0))); while (true) { for (int i = 0; i < 2; i++) { executor.execute(() -> { Util.sleep(
1); disruptor.publishEvent((event, sequence, arg0) -> { event.setValue(arg0 + " " + sequence); }, "hello world"); }); } Util.sleep(1000); } }

event生產流程

event的生產是從 RingBuffer.publishEvent 開始的,event生產流程步驟如下:
  • 獲取待插入(到ringBuffer的)位置,相當於先佔個位
  • 往該位置上設定event
  • 設定sequence對應event的標誌,通知consumer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
    // 獲取當前要設定的sequence序號,然後進行設定並通知消費者
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence, arg0);
}
 
// 獲取下一個sequence,直到獲取到位置才返回
public long next(int n) {
    long current;
    long next;
     
    do {
        // 獲取當前ringBuffer的可寫入sequence
        current = cursor.get();
        next = current + n;
 
        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();
 
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            // 如果當前沒有空位置寫入,獲取多個consumer中消費進度最小的那個的消費進度
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
 
            if (wrapPoint > gatingSequence) {
                // 阻塞1ns,然後continue
                LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                continue;
            }
 
            gatingSequenceCache.set(gatingSequence);
        }
        // cas設定ringBuffer的sequence
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    } while (true);
 
    return next;
}
 
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
    try {
        // 設定event
        translator.translateTo(get(sequence), sequence, arg0);
    } finally {
        sequencer.publish(sequence);
    }
}
public void publish(final long sequence) {
    // 1. 設定availableBuffer,表示對應的event是否設定完成,consumer執行緒中會用到
    //   - 注意,到這裡時,event已經設定完成,但是consumer還不知道該sequence對應的event是否設定完成,
    //   - 所以需要設定availableBuffer中sequence對應event的sequence number
    // 2. 通知consumer
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}

從translateAndPublish中看,如果使用者的設定event方法丟擲異常,這時event物件是不完整的,那麼publish到consumer端,consumer消費的不是完整的資料怎麼辦呢?在translateAndPublish中需不需要在異常情況下reset event物件呢?關於這個問題筆者之前是有疑問的,關於這個問題筆者提了一個issue,可點選 https://github.com/LMAX-Exchange/disruptor/issues/244 進行檢視。

筆者建議在consumer消費完event之後,進行reset event操作,這樣避免下次設定event異常consumer時取到不完整的資料,比如log4j2中的AsyncLogger中處理完log4jEvent之後就會呼叫clear方法進行重置event。

event消費流程

event消費流程入口是BatchEventProcessor.processEvents,event消費流程步驟:
  • 獲取當前consumer執行緒消費的offset,即nextSequence
  • 從ringBuffer獲取可用的sequence,沒有新的event時,會根據consmer阻塞策略進行執行某些動作
  • 獲取event,然後執行event回撥
  • 設定當前consumer執行緒的消費進度
private void processEvents() {
    T event = null;
    long nextSequence = sequence.get() + 1L;
 
    while (true) {
        try {
            // 獲取可用的sequence,預設直到有可用sequence時才返回
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null) {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }
 
            // 執行消費回撥動作,注意,這裡獲取到一個批次event,可能有多個,個數為availableSequence-nextSequence + 1
            // nextSequence == availableSequence表示該批次只有一個event
            while (nextSequence <= availableSequence) {
                // 獲取nextSequence位置上的event
                event = dataProvider.get(nextSequence);
                // 使用者自定義的event 回撥
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }
 
            // 設定當前consumer執行緒的消費進度sequence
            sequence.set(availableSequence);
        } catch (final Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}
 
public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException{
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
 
    if (availableSequence < sequence) {
        return availableSequence;
    }
 
    // 獲取ringBuffer中可安全讀的最大的sequence number,該資訊存在availableBuffer中的sequence
    // 在MultiProducerSequencer.publish方法中會設定
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
 
// 預設consumer阻塞策略 BlockingWaitStrategy
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    throws AlertException, InterruptedException
{
    long availableSequence;
    if (cursorSequence.get() < sequence) {
        // 當前ringBuffer的sequence小於sequence,阻塞等待
        // event生產之後會喚醒
        synchronized (mutex) {
            while (cursorSequence.get() < sequence) {
                barrier.checkAlert();
                mutex.wait();
            }
        }
    }
 
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        ThreadHints.onSpinWait();
    }
 
    return availableSequence;
}

從上面的event消費流程來看,消費執行緒會讀取ringBuffer的sequence,然後更新本消費執行緒內的offset(消費進度sequence),如果有多個event的話,那麼就是廣播消費模式了(單consumer執行緒內還是順序消費),如果不想讓event被廣播消費(重複消費),可使用如下方法新增consumer執行緒(WorkHandler是叢集消費,EventHandler是廣播消費):

disruptor.handleEventsWithWorkerPool((WorkHandler<StringEvent>) event -> {
    System.out.println(Util.threadName() + "onEvent " + event);
});

disruptor高效能之道

棄用鎖機制改用CAS

event生產流程中獲取並自增sequence時用的就是CAS,獲取之後該sequence對應位置的操作只會在單執行緒,沒有了併發問題。

叢集消費模式下獲取sequence之後也會使用CAS設定為sequence新值,設定本地消費進度,然後再執行獲取event並執行回撥邏輯。

注意,disruptor中較多地方使用了CAS,但並不代表完全沒有了鎖機制,比如預設consumer阻塞策略 BlockingWaitStrategy發揮作用時,consumer消費執行緒就會阻塞,只不過這隻會出現在event生產能力不足是才會存在。如果consumer消費不足,大量event生產導致ringBuffer爆滿,這時event生產執行緒就會輪詢呼叫LockSupport.parkNanos(1),這裡的成本也不容小覷(涉及到執行緒切換損耗)。

  避免偽共享引入緩衝行填充

偽共享講的是多個CPU時的123級快取的問題,通常,快取是以快取行的方式讀取資料,如果A、B兩個變數被緩衝在同一行之內,那麼對於其中一個的更新會導致另一個緩衝無效,需要從記憶體中讀取,這種無法充分利用快取行的問題就是偽共享。disruptor相關程式碼如下:

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
    protected volatile long value;
}
  使用RingBuffer作為資料儲存容器

ringBuffer是一個環形佇列,本質是一個數組,size為2的冪次方(方便做&操作),資料位置sequence值會和size做&操作得出陣列下標,然後進行資料的讀寫操作(只在同一個執行緒內,無併發問題)。

  小結

disruptor初衷是為了解決記憶體佇列的延遲問題,作為一個高效能佇列,包括Apache Storm、Camel、Log4j 2在內的很多知名專案都在使用。disruptor的重要機制就是CAS和RingBuffer,藉助於它們兩個實現資料高效的生產和消費

disruptor多生產者多消費者模式下,因為RingBuffer資料的寫入是分為2步的(先獲取到個sequence,然後寫入資料),如果獲取到sequence之後,生產者寫入RingBuffer較慢,consumer消費較快,那麼生產者最終會拖慢consumer消費進度,這一點需注意(如果已經消費到生產者佔位的前一個數據了,那麼consumer會執行對應的阻塞策略)。在實際使用過程中,如果consumer消費邏輯耗時較長,可以封裝成任務交給執行緒池來處理,避免consumer端拖慢生成者的寫入速度。

disruptor的設計對於開發者來說有哪些借鑑的呢?儘量減少競爭,避免多執行緒對同一資料做操作,比如disruptor使用CAS獲取只會在一個執行緒內進行讀寫的event物件,這種思想其實已經在JDK的thread本地記憶體中有所體現;儘量複用物件,避免大量的記憶體申請釋放,增加GC損耗,disruptor通過複用event物件來保證讀寫時不會產生物件GC問題;選擇合適資料結構,disruptor使用ringBuffer,環形陣列來實現資料高效讀寫。

 

參考資料:

1、https://tech.meituan.com/disruptor.html