Disruptor 中的2種事件消費模式
在Disruptor快速入門中,我們在構造 Disruptor 的時候,明確指定了單生產者模式,那麼消費者呢?有幾個消費者執行緒來處理訊息?每個事件會被處理幾次?
當我們呼叫 disruptor.handleEventsWith 設定訊息的處理器時,我們提供的 Event Handler 會被包裝為 BatchEventProcessor。
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; // 這裡 final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
BatchEventProcessor 實現了 Runnable 介面。

在 Disruptor 啟動的時候,就會根據上述構造的消費者相關資訊(ConsumerRepository)啟動對應的執行緒去輪詢訊息並處理。

新執行緒就會一直從 RingBuffer 中輪詢訊息並呼叫對應的事件處理器處理。
通過上述的分析,我們可以知道消費者執行緒的個數取決於我們構造 Disruptor 時提供的 EventHandler 的個數。所以第一種實現多消費者模式的方法就是提供多個 EventHandler。
多個消費者各自處理事件(Multicast)
給 Disruptor 提供多個 EventHandler 就會開啟多個消費者工作執行緒,每個消費者都會處理所有的事件,是一種多播模式。
EventHandler<LogEvent>[] consumers = new LogEventConsumer[WORKER_SIZE]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new LogEventConsumer(); } disruptor.handleEventsWith(consumers);

接下來看下原始碼為何如此?消費者想要獲取到 RingBuffer 中的元素,就需要從 Sequnce 中取得可用的序列號,否則就會執行等待策略。前面已經說過, EventHandler 最終封裝為 BatchEventProcessor,每個 BatchEventProcessor 在執行 EventHandler 相應邏輯之前都會先獲取可用的序列號,因為每個 BatchEventProcessor 獨立維護了一個 Sequence 物件,所以每個事件都會被所有的消費者處理一遍。
// 從0開始 private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; // 獲取下一個序列號 long nextSequence = sequence.get() + 1L; try { while (true) { try { // 等待有可取的事件 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); // 處理訊息 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); } }
多個消費者合作處理一批事件
上面的方式是每個 Consumer 都會處理相同的訊息,可以聯絡 EventBus,Kafka裡面的 ConsumerGroup。那麼如果想多個 Consumer 協作處理一批訊息呢?此時可以利用 Disruptor 的 WorkPool 支援,我們定製相應的執行緒池(Executor)來處理 EventWorker 任務。

使用這種模式的一種場景是處理每個事件比較耗時,開啟多個執行緒來加快處理。
// Fixed Thread Pool ExecutorService executor = new ThreadPoolExecutor(WORKER_SIZE, WORKER_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() { private int counter = 0; private String prefix = "DisruptorWorker"; @Override public Thread newThread(Runnable r) { return new Thread(r, prefix + "-" + counter++); } }); // 環形陣列的容量,必須要是2的次冪 int bufferSize = 1024; // 構造 Disruptor Disruptor<LogEvent> disruptor = new Disruptor<>(new LogEventFactory(), bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 設定消費者 WorkHandler<LogEvent>[] consumers = new LogEventConsumer[WORKER_SIZE]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new LogEventConsumer(); } disruptor.handleEventsWithWorkerPool(consumers); // 啟動 Disruptor disruptor.start();
現在上述形式的 Disruptor 構造器已廢棄,可以採用下面這種方式。
RingBuffer<LogEvent> ringBuffer = RingBuffer.create(ProducerType.SINGLE, new LogEventFactory(), bufferSize, new YieldingWaitStrategy()); SequenceBarrier barriers = ringBuffer.newBarrier(); WorkerPool<LogEvent> workerPool = new WorkerPool<LogEvent>(ringBuffer, barriers, null, consumers); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executor);
接下來分析怎麼做到一個事件只處理一次的。在使用 WorkPool 時,我們提供的事件處理器最終會被封裝為 WorkProcessor,裡面的 run 方法便揭示了原因:所有的消費者都是從同一個 Sequnce 中取可用的序列號。

public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler if (processedSequence) { processedSequence = false; do { // 每個 WorkPool 裡面的消費者都是從同一個 Sequnce 中取可用的序列號 nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } if (cachedAvailableSequence >= nextSequence) { // 其他都是常規操作 event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); }
一個要注意的問題
在使用 WorkPool 的時候務必要保證一個 Consumer 要對應一個執行緒,否則當 RingBuffer 滿的時候,Producer 和 Consumer 都會阻塞,一個 ofollow,noindex" target="_blank">例子 。


正因為存在這個問題,所以下面形式的 Disruptor 構造器已廢棄。
@Deprecated public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); }
推薦使用的是提供 ThreadFactory 形式的構造器,後續會根據事件處理器的個數來新增對應的執行緒。
public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
參考
Exchange/disruptor/wiki/Introduction" target="_blank" rel="nofollow,noindex"> https:// github.com/LMAX-Exchang e/disruptor/wiki/Introduction
https:// groups.google.com/forum /#!topic/lmax-disruptor/nuPAT7mhjiE
完整程式碼 地址