Disruptor 快速入門
為了提高系統的吞吐量,通常會採用佇列來實現批量處理,釋出訂閱模式,非同步等場景。在JDK的內建佇列中,一般實際中會使用 ArrayBlockingQueue,一方面是有界的,另一方面是通過加鎖實現的執行緒安全,比如在使用執行緒池的時候最佳實踐就是指定了一個 ArrayBlockingQueue 作為任務佇列。
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 實現自己的拒絕策略 } });
LMAX公司開發的 Disruptor 通過無鎖(CAS),避免快取行偽共享,環形陣列(RingBuffer)實現了更高的效能,Storm,Log4j2中都使用了 Disruptor。
本文是 Disruptor 快速入門篇。
引入依賴
依賴配置。
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.7</version> </dependency>
定義事件,事件工廠
定義一個簡單的事件,這裡假設要處理的是日誌訊息。
@Data public class LogEvent { private String msg; } public class LogEventFactory implements EventFactory<LogEvent> { @Override public LogEvent newInstance() { return new LogEvent(); } }
事件工廠用於 Disruptor 在 RingBuffer 中預分配空間 ,從 RingBuffer 的原始碼可以看到這一點。
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
消費者
定義消費者來處理我們的事件。
public class LogEventConsumer implements EventHandler<LogEvent> { @Override public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(Thread.currentThread().getName() + " | Event : " + event); } }
生產者
定義事件的源頭,裡面的事件轉換器(EventTranslatorOneArg)會把輸出的引數轉為我們的事件型別。
public class LogEventProducer { private final RingBuffer<LogEvent> ringBuffer; public LogEventProducer(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LogEvent, String> TRANSLATOR = new EventTranslatorOneArg<LogEvent, String>() { public void translateTo(LogEvent event, long sequence, String bb) { event.setMsg(bb); } }; public void onData(String msg){ ringBuffer.publishEvent(TRANSLATOR, msg); } }
組裝 Disruptor
把上述元件組裝起來就可以了,Disruptor 構造器中的兩個引數-生產者型別 ProducerType(單個,或者多個?),WaitStrategy(等待RingBuffer中對應序列號可用的策略)會影響 Disruptor 的效能。
public class LogEventMain { public static void main(String[] args) { LogEventFactory factory = new LogEventFactory(); // 環形陣列的容量,必須要是2的次冪 int bufferSize = 1024; // 構造 Disruptor Disruptor<LogEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy()); // 設定消費者 disruptor.handleEventsWith(new LogEventConsumer()); // 啟動 Disruptor disruptor.start(); // 生產者要使用 Disruptor 的環形陣列 RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); LogEventProducer producer = new LogEventProducer(ringBuffer); // 模擬訊息傳送 for (int i = 0; i < 10000; i++) { producer.onData(String.format("msg-%s", i)); } } }
參考
ofollow,noindex"> https:// github.com/LMAX-Exchang e/disruptor/wiki/Getting-Started