1. 程式人生 > >LMAX Disruptor——一個高效能、低延遲且簡單的框架

LMAX Disruptor——一個高效能、低延遲且簡單的框架

Disruptor是一個用於線上程間通訊的高效低延時的訊息元件,它像個增強的佇列,並且它是讓LMAX Exchange跑的如此之快的一個關鍵創新。關於什麼是Disruptor、為何它很重要以及它的工作原理方面的資訊都呈爆炸性增長 —— 這些文章很適合開始學習Disruptor,還可跟著LMAX BLOG深入學習。這裡還有一份更詳細的白皮書

雖然disruptor模式使用起來很簡單,但是建立多個消費者以及它們之間的依賴關係需要的樣板程式碼太多了。為了能快速又簡單適用於99%的場景,我為Disruptor模式準備了一個簡單的領域特定語言。例如,為建立一個消費者的“四邊形模式”:

在這種情況下,只要生產者(P1)將元素放到ring buffer上,消費者C1和C2就可以並行處理這些元素。但是消費者C3必須一直等到C1和C2處理完之後,才可以處理。在現實世界中的對應的案例就像:在處理實際的業務邏輯(C3)之前,需要校驗資料(C1),以及將資料寫入磁碟(C2)。

用原生的Disruptor語法來建立這些消費者的話程式碼如下:

Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3()
RingBuffer ringBuffer = new RingBuffer(ENTRY_FACTORY, RING_BUFFER_SIZE);
ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier();
BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1);
BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2);
ConsumerBarrier consumerBarrier2 =
ringBuffer.createConsumerBarrier(consumer1, consumer2);
BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3);
executor.execute(consumer1);
executor.execute(consumer2);
executor.execute(consumer3);
ProducerBarrier producerBarrier =
ringBuffer.createProducerBarrier(consumer3);

在以上這段程式碼中,我們不得不建立那些個handler(就是那些個MyBatchHandler例項),外加消費者屏障,BatchConsumer例項,然後在他們各自的執行緒中處理這些消費者。DSL能幫我們完成很多建立工作,最終的結果如下:

Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3();
DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY,
	RING_BUFFER_SIZE, executor);
dw.consumeWith(handler1, handler2).then(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();

我們甚至可以在一個更復雜的六邊形模式中構建一個並行消費者鏈:

dw.consumeWith(handler1a, handler2a);
dw.after(handler1a).consumeWith(handler1b);
dw.after(handler2a).consumeWith(handler2b);
dw.after(handler1b, handler2b).consumeWith(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();

這個領域特定語言剛剛誕生不久,歡迎任何反饋,也歡迎大家從github上fork並改進它。