ArrayBlcokingQueue,LinkedBlockingQueue與Disruptor三種佇列對比與分析
一、基本介紹
ArrayBlcokingQueue,LinkedBlockingQueue是jdk中內建的阻塞佇列,網上對它們的分析已經很多,主要有以下幾點:
1、底層實現機制不同,ArrayBlcokingQueue是基於陣列的,LinkedBlockingQueue是基於連結串列的;
2、初始化方式不同,ArrayBlcokingQueue是有界的,初始化時必須指定佇列的大小;LinkedBlockingQueue可以是無界的,但如果初始化時指定了佇列大小,也可以做為有界佇列使用;
3、鎖機制實現不同,ArrayBlcokingQueue生產和消費使用的是同一把鎖,並沒有做鎖分離;LinkedBlockingQueue中生產、消費分別通過putLock與takeLock保證同步,進行了鎖的分離;
使用的過程中,根據應該場景提供了可選插入和刪除策略,我們需要掌握和區分
1、插入操作
//佇列未滿時,返回true;佇列滿則丟擲IllegalStateException(“Queue full”)異常 add(e); //佇列未滿時,直接插入沒有返回值;佇列滿時會阻塞等待,一直等到佇列未滿時再插入。 put(e); //佇列未滿時,返回true;佇列滿時返回false。非阻塞立即返回。 offer(e); //設定等待的時間,如果在指定時間內還不能往佇列中插入資料則返回false,插入成功返回true。 offer(e, timeout, unit);
2、刪除操作
//佇列不為空時,返回隊首值並移除;佇列為空時丟擲NoSuchElementException()異常 remove(); //佇列不為空返回隊首值並移除;當佇列為空時會阻塞等待,一直等到佇列不為空時再返回隊首值。 queue.take(); //佇列不為空時返回隊首值並移除;佇列為空時返回null。非阻塞立即返回。 queue.poll(); //設定等待的時間,如果在指定時間內佇列還未孔則返回null,不為空則返回隊首值 queue.poll(timeout, unit)
Disruptor框架是由LMAX公司開發的一款高效的無鎖記憶體佇列。
Disruptor的最大特點就是高效能,它的內部與眾不同的使用了環形佇列(RingBuffer)來代替普通的線型佇列,相比普通佇列環形佇列不需要針對性的同步head和tail頭尾指標,減少了執行緒協作的複雜度,再加上它本身基於無鎖操作的特性,從而可以達到了非常高的效能;
在使用Disruptor框架時,我們需要注意以下幾個方面
1、Disruptor的構造
/** * * * @param eventFactory定義的事件工廠 * @param ringBufferSize環形佇列RingBuffer的大小,必須是2的N次方 * @param threadFactory消費者執行緒工廠 * @param producerType 生產者執行緒的設定,當你只有一個生產者執行緒時設定為 ProducerType.SINGLE,多個生產者執行緒ProducerType.MULTI * @param waitStrategy消費者的等待策略 */ public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
上面的消費者等待策略有以下:
BlockingWaitStrategy: 使用鎖和條件變數。CPU資源的佔用少,延遲大;
SleepingWaitStrategy: 在多次迴圈嘗試不成功後,選擇讓出CPU,等待下次排程,多次排程後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。
YieldingWaitStrategy: 在多次迴圈嘗試不成功後,通過Thread.yield()讓出CPU,等待下次排程。效能和CPU資源佔用上較為平衡,但要注意使用該策略時消費者執行緒最好小於CPU的核心數
BusySpinWaitStrategy: 效能最高的一種,一直不停的自旋等待,獲取資源。可以壓榨出最高的效能,但會佔用最多的CPU資源
PhasedBackoffWaitStrategy: 上面多種策略的綜合,CPU資源的佔用少,延遲大。
2、handleEventsWith與handleEventsWithWorkerPool的區別
這兩個方法區別主要就是在於是否重複消費佇列中的訊息,前者載入的不同消費者會各自對訊息進行消費,各個消費者之間不存在競爭。後者消費者對於佇列中的同一條訊息不重複消費;
二、效能對比
上面我們對三種阻塞佇列做了一個基本的介紹,下面我們分別對它們進行效能上的測試與比對,看下ArrayBlcokingQueue與LinkedBlockingQueue效能上有哪些差別,而Disruptor是否像說的那樣具備很高的併發效能
首先我們構造一個加單的訊息事件實體
public class InfoEvent implements Serializable { private static final long serialVersionUID = 1L; private long id; private String value; public InfoEvent() { } public InfoEvent(long id, String value) { this.id = id; this.value = value; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
定義事件工廠
public class InfoEventFactory implements EventFactory<InfoEvent>{ public InfoEvent newInstance() { return new InfoEvent(); } }
定義Disruptor的消費者
public class InfoEventConsumer implements WorkHandler<InfoEvent> { private long startTime; private int cnt; public InfoEventConsumer() { this.startTime = System.currentTimeMillis(); } @Override public void onEvent(InfoEvent event) throws Exception { // TODO Auto-generated method stub cnt++; if (cnt == DisruptorTest.infoNum) { long endTime = System.currentTimeMillis(); System.out.println(" 消耗時間: " + (endTime - startTime) + "毫秒"); } } }
接下來分別針對ArrayBlockingQueue、LinkedBlockingQueue與Disruptor編寫測試程式
ArrayBlcokingQueueTest
public class ArrayBlcokingQueueTest { public static int infoNum = 5000000; public static void main(String[] args) { final BlockingQueue<InfoEvent> queue = new ArrayBlockingQueue<InfoEvent>(100); final long startTime = System.currentTimeMillis(); new Thread(new Runnable() { @Override public void run() { int pcnt = 0; while (pcnt < infoNum) { InfoEvent kafkaInfoEvent = new InfoEvent(pcnt, pcnt+"info"); try { queue.put(kafkaInfoEvent); } catch (InterruptedException e) { e.printStackTrace(); } pcnt++; } } }).start(); new Thread(new Runnable() { @Override public void run() { int cnt = 0; while (cnt < infoNum) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } cnt++; } long endTime = System.currentTimeMillis(); System.out.println("消耗時間 : " + (endTime - startTime) + "毫秒"); } }).start(); } }
LinkedBlockingQueueTest
public class LinkedBlockingQueueTest { public static int infoNum = 50000000; public static void main(String[] args) { final BlockingQueue<InfoEvent> queue = new LinkedBlockingQueue<InfoEvent>(); final long startTime = System.currentTimeMillis(); new Thread(new Runnable() { @Override public void run() { int pcnt = 0; while (pcnt < infoNum) { InfoEvent kafkaInfoEvent = new InfoEvent(pcnt, pcnt + "info"); try { queue.put(kafkaInfoEvent); } catch (InterruptedException e) { e.printStackTrace(); } pcnt++; } } }).start(); new Thread(new Runnable() { @Override public void run() { int cnt = 0; while (cnt < infoNum) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } cnt++; } long endTime = System.currentTimeMillis(); System.out.println("消耗時間: " + (endTime - startTime) + "毫秒"); } }).start(); } }
DisruptorTest
public class DisruptorTest { public static int infoNum = 5000000; @SuppressWarnings("unchecked") public static void main(String[] args) { InfoEventFactory factory = new InfoEventFactory(); int ringBufferSize = 65536; //資料緩衝區的大小 必須為2的次冪 /** * *factory,定義的事件工廠 *ringBufferSize,環形佇列RingBuffer的大小,必須是2的N次方 *ProducerType,生產者執行緒的設定,當你只有一個生產者執行緒時設定為 ProducerType.SINGLE,多個生產者執行緒ProducerType.MULTI *waitStrategy,消費者的等待策略 * */ final Disruptor<InfoEvent> disruptor = new Disruptor<InfoEvent>(factory, ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy()); InfoEventConsumer consumer = new InfoEventConsumer(); disruptor.handleEventsWithWorkerPool(consumer); disruptor.start(); new Thread(new Runnable() { @Override public void run() { RingBuffer<InfoEvent> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < infoNum; i++) { long seq = ringBuffer.next(); InfoEvent infoEvent = ringBuffer.get(seq); infoEvent.setId(i); infoEvent.setValue("info" + i); ringBuffer.publish(seq); } } }).start(); } }
我們在十萬、百萬、千萬三個數量級上,分別對ArrayBlockingQueue,LinkedBlockingQueue初始化為無界和有界佇列,Disruptor的BlockingWaitStrategy和YieldingWaitStrategy,進行三次測試,生產者與消費者均在單執行緒模式下執行,對結果進行統計記錄;
測試環境:
作業系統:win7 64位,CPU:Intel Core i7-3250M 2.9GHz ,記憶體:8G,JDK:1.8,disruptor版本:3.4.2
五十萬資料
第一次 |
第二次 |
第三次 |
|
ArrayBlcokingQueue |
229ms |
233ms |
253ms |
LinkedBlockingQueue (無界) |
211ms |
207ms |
202ms |
LinkedBlockingQueue (有界) |
265ms |
207ms |
256ms |
Disruptor ( BlockingWaitStrategy ) |
71ms |
56ms |
65ms |
Disruptor ( YieldingWaitStrategy ) |
56ms |
48ms |
49ms |
五百萬資料
第一次 |
第二次 |
第三次 |
|
ArrayBlcokingQueue |
1530ms |
1603ms |
1576ms |
LinkedBlockingQueue (無界) |
1369ms |
1390ms |
1409ms |
LinkedBlockingQueue (有界) |
1408ms |
1397ms |
1494ms |
Disruptor ( BlockingWaitStrategy ) |
345ms |
363ms |
357ms |
Disruptor ( YieldingWaitStrategy ) |
104ms |
108ms |
107ms |
五千萬資料
第一次 |
第二次 |
第三次 |
|
ArrayBlcokingQueue |
14799ms |
14928ms |
15122ms |
LinkedBlockingQueue (無界) |
14226ms |
14008ms |
13518ms |
LinkedBlockingQueue (有界) |
14039ms |
14434ms |
13839ms |
Disruptor ( BlockingWaitStrategy ) |
2972ms |
2910ms |
2848ms |
Disruptor ( YieldingWaitStrategy ) |
699ms |
742ms |
698ms |
然後我對程式進行了修改,讓測試程式持續執行,每五千萬輸出一次,對執行期間CPU和記憶體使用情況進行了記錄
ArrayBlcokingQueue
LinkedBlockingQueue(無界)
LinkedBlockingQueue(有界)
Disruptor(BlockingWaitStrategy)
Disruptor(YieldingWaitStrategy)
從上面的測試中我們可以看到ArrayBlcokingQueue與LinkedBlockingQueue效能上區別不是很大,LinkedBlockingQueue由於讀寫鎖的分離,平均效能會稍微好些,但差距並不明顯。
而Disruptor效能表現突出,特別是隨著資料量的增大,優勢會越發明顯。同時在單執行緒生產和消費的應用場景下,相比jdk內建的阻塞佇列,CPU和GC的壓力反而更小。
三、總結
1、ArrayBlcokingQueue與LinkedBlockingQueue,一般認為前者基於陣列實現,初始化後不需要再建立新的物件,但沒有進行鎖分離,所以記憶體GC壓力較小,但效能會相對較低;後者基於連結串列實現,每次都需要建立 一個node物件,會存在頻繁的建立銷燬操作,GC壓力較大,但插入和刪除資料是不同的鎖,進行了鎖分離,效能會相對較好;從測試結果上看,其實兩者效能和GC上差別都不大,在實際運用過程中,我認為一般場景下ArrayBlcokingQueue的效能已經足夠應對,處於對GC壓力的考慮,及潛在的OOM的風險我建議普通情況下使用ArrayBlcokingQueue即可。當然你也可以使用LinkedBlockingQueue,從測試結果上看,它相比ArrayBlcokingQueue效能上有有所提升但並不明顯,結合gc的壓力和潛在OOM的風險,所以結合應用的場景需要綜合考慮。
2、Disruptor做為一款高效能佇列框架,確實足夠優秀,在測試中我們可以看到無論是效能和GC壓力都遠遠好過ArrayBlcokingQueue與LinkedBlockingQueue;如果你追求更高的效能,那麼Disruptor是一個很好的選擇。 但需要注意的是,你需要結合自己的硬體配置和業務場景,正確配置Disruptor,選擇合適的消費策略,這樣不僅可以獲取較高的效能,同時可以保證硬體資源的合理分配。
3、對這三種阻塞佇列的測試,並不是為了比較孰優孰劣,主要是為了加強理解,實際的業務應用需要根據情況合理進行選擇。這裡只是結合自己的使用,對它們進行一個簡單的總結,並沒有進行較深入的探究,如有錯誤的的地方還請指正與海涵。
<em> </em>
<em> </em>
<em id="__mceDel"> </em>