1. 程式人生 > >ArrayBlcokingQueue,LinkedBlockingQueue與Disruptor三種隊列對比與分析

ArrayBlcokingQueue,LinkedBlockingQueue與Disruptor三種隊列對比與分析

策略 fin font ins idt 過程 毫秒 ringbuf 鏈表

一、基本介紹

ArrayBlcokingQueue,LinkedBlockingQueue是jdk中內置的阻塞隊列,網上對它們的分析已經很多,主要有以下幾點:

1、底層實現機制不同,ArrayBlcokingQueue是基於數組的,LinkedBlockingQueue是基於鏈表的;

2、初始化方式不同,ArrayBlcokingQueue是有界的,初始化時必須指定隊列的大小;LinkedBlockingQueue可以是無界的,但如果初始化時指定了隊列大小,也可以做為有界隊列使用;

3、鎖機制實現不同,ArrayBlcokingQueue生產和消費使用的是同一把鎖,並沒有做鎖分離;LinkedBlockingQueue中生產、消費分別通過putLock與takeLock保證同步,進行了鎖的分離;

使用的過程中,根據應該場景提供了可選插入和刪除策略,我們需要掌握和區分

1、插入操作

//隊列未滿時,返回true;隊列滿則拋出IllegalStateException(“Queue full”)異常
add(e);
//隊列未滿時,直接插入沒有返回值;隊列滿時會阻塞等待,一直等到隊列未滿時再插入。 put(e); //隊列未滿時,返回true;隊列滿時返回false。非阻塞立即返回。 offer(e); //設定等待的時間,如果在指定時間內還不能往隊列中插入數據則返回false,插入成功返回true。 offer(e, timeout, unit);

2、刪除操作

//隊列不為空時,返回隊首值並移除;隊列為空時拋出NoSuchElementException()異常
remove();
//隊列不為空返回隊首值並移除;當隊列為空時會阻塞等待,一直等到隊列不為空時再返回隊首值。
queue.take();
//隊列不為空時返回隊首值並移除;隊列為空時返回null。非阻塞立即返回。? queue.poll(); //設定等待的時間,如果在指定時間內隊列還未孔則返回null,不為空則返回隊首值? queue.poll(timeout, unit)

Disruptor框架是由LMAX公司開發的一款高效的無鎖內存隊列。

Disruptor的最大特點就是高性能,它的內部與眾不同的使用了環形隊列(RingBuffer)來代替普通的線型隊列,相比普通隊列環形隊列不需要針對性的同步head和tail頭尾指針,減少了線程協作的復雜度,再加上它本身基於無鎖操作的特性,從而可以達到了非常高的性能;

在使用Disruptor框架時,我們需要註意以下幾個方面

1、Disruptor的構造

    /**
     * 
     *
     * @param eventFactory   定義的事件工廠
     * @param ringBufferSize  環形隊列RingBuffer的大小,必須是2的N次方
     * @param threadFactory  消費者線程工廠
     * @param producerType 生產者線程的設置,當你只有一個生產者線程時設置為 ProducerType.SINGLE,多個生產者線程ProducerType.MULTI
     * @param waitStrategy  消費者的等待策略
     */
    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }

上面的消費者等待策略有以下:

BlockingWaitStrategy: 使用鎖和條件變量。CPU資源的占用少,延遲大;

SleepingWaitStrategy: 在多次循環嘗試不成功後,選擇讓出CPU,等待下次調度,多次調度後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源占用,但延遲不均勻。

YieldingWaitStrategy: 在多次循環嘗試不成功後,通過Thread.yield()讓出CPU,等待下次調度。性能和CPU資源占用上較為平衡,但要註意使用該策略時消費者線程最好小於CPU的核心數

BusySpinWaitStrategy: 性能最高的一種,一直不停的自旋等待,獲取資源。可以壓榨出最高的性能,但會占用最多的CPU資源

PhasedBackoffWaitStrategy: 上面多種策略的綜合,CPU資源的占用少,延遲大。

2、handleEventsWith與handleEventsWithWorkerPool的區別

這兩個方法區別主要就是在於是否重復消費隊列中的消息,前者加載的不同消費者會各自對消息進行消費,各個消費者之間不存在競爭。後者消費者對於隊列中的同一條消息不重復消費;

二、性能對比

上面我們對三種阻塞隊列做了一個基本的介紹,下面我們分別對它們進行性能上的測試與比對,看下ArrayBlcokingQueue與LinkedBlockingQueue性能上有哪些差別,而Disruptor是否像說的那樣具備很高的並發性能

首先我們構造一個加單的消息事件實體

    public class InfoEvent implements Serializable {
    private static final long serialVersionUID = 1L;
    private long id;
    private String value;

    public InfoEvent() {

    }

    public InfoEvent(long id, String value) {
        this.id = id;
        this.value = value;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

定義事件工廠

public class InfoEventFactory implements EventFactory<InfoEvent>{
    public InfoEvent newInstance() {
        return new InfoEvent();
    }
 
}

定義Disruptor的消費者

public class InfoEventConsumer implements WorkHandler<InfoEvent> {
    private long startTime;
    private int cnt;

    public InfoEventConsumer() {
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public void onEvent(InfoEvent event) throws Exception {
        // TODO Auto-generated method stub
        cnt++;

        if (cnt == DisruptorTest.infoNum) {
            long endTime = System.currentTimeMillis();
            System.out.println(" 消耗時間: " + (endTime - startTime) + "毫秒");
        }

    }
}

接下來分別針對ArrayBlockingQueue、LinkedBlockingQueue與Disruptor編寫測試程序

ArrayBlcokingQueueTest

public class ArrayBlcokingQueueTest {
    public static int infoNum = 5000000;
    public static void main(String[] args) {
        final BlockingQueue<InfoEvent> queue = new ArrayBlockingQueue<InfoEvent>(100);
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {

            @Override
            public void run() {
                int pcnt = 0;
                while (pcnt < infoNum) {
                    InfoEvent kafkaInfoEvent = new InfoEvent(pcnt, pcnt+"info");
                    try {
                        queue.put(kafkaInfoEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    pcnt++;
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                int cnt = 0;
                while (cnt < infoNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    cnt++;
                }
                long endTime = System.currentTimeMillis();
                System.out.println("消耗時間 : " + (endTime - startTime) + "毫秒");
            }
        }).start();
    }
}

LinkedBlockingQueueTest

public class LinkedBlockingQueueTest {
    
    public static int infoNum = 50000000;

    public static void main(String[] args) {
        final BlockingQueue<InfoEvent> queue = new LinkedBlockingQueue<InfoEvent>();
        final long startTime = System.currentTimeMillis();
        new Thread(new Runnable() {
            @Override
            public void run() {
                int pcnt = 0;
                while (pcnt < infoNum) {
                    InfoEvent kafkaInfoEvent = new InfoEvent(pcnt, pcnt + "info");
                    try {
                        queue.put(kafkaInfoEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    pcnt++;
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                int cnt = 0;
                while (cnt < infoNum) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    cnt++;
                }
                long endTime = System.currentTimeMillis();
                System.out.println("消耗時間: " + (endTime - startTime) + "毫秒");
            }
        }).start();
    }

}

DisruptorTest

public class DisruptorTest {
    public static int infoNum = 5000000;
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        InfoEventFactory factory = new InfoEventFactory();
        int ringBufferSize = 65536; //數據緩沖區的大小 必須為2的次冪
        
        /**
         * 
         *  factory,定義的事件工廠
         *  ringBufferSize,環形隊列RingBuffer的大小,必須是2的N次方
         *  ProducerType,生產者線程的設置,當你只有一個生產者線程時設置為 ProducerType.SINGLE,多個生產者線程ProducerType.MULTI
         *  waitStrategy,消費者的等待策略  
         *  
         */
        final Disruptor<InfoEvent> disruptor = new Disruptor<InfoEvent>(factory, ringBufferSize,
                DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy());

        InfoEventConsumer consumer = new InfoEventConsumer();
        disruptor.handleEventsWithWorkerPool(consumer);
        disruptor.start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                RingBuffer<InfoEvent> ringBuffer = disruptor.getRingBuffer();
                for (int i = 0; i < infoNum; i++) {
                    long seq = ringBuffer.next();
                    InfoEvent infoEvent = ringBuffer.get(seq);
                    infoEvent.setId(i);
                    infoEvent.setValue("info" + i);
                    ringBuffer.publish(seq);
                }
            }
        }).start();
    }
}

我們在十萬、百萬、千萬三個數量級上,分別對ArrayBlockingQueue,LinkedBlockingQueue初始化為無界和有界隊列,Disruptor的BlockingWaitStrategy和YieldingWaitStrategy,進行三次測試,生產者與消費者均在單線程模式下運行,對結果進行統計記錄;

測試環境:

操作系統:win7 64位,CPU:Intel Core i7-3250M 2.9GHz ,內存:8G,JDK:1.8,disruptor版本:3.4.2

五十萬數據

第一次

第二次

第三次

ArrayBlcokingQueue

229ms

233ms

253ms

LinkedBlockingQueue(無界)

211ms

207ms

202ms

LinkedBlockingQueue(有界)

265ms

207ms

256ms

DisruptorBlockingWaitStrategy

71ms

56ms

65ms

DisruptorYieldingWaitStrategy

56ms

48ms

49ms

五百萬數據

第一次

第二次

第三次

ArrayBlcokingQueue

1530ms

1603ms

1576ms

LinkedBlockingQueue(無界)

1369ms

1390ms

1409ms

LinkedBlockingQueue(有界)

1408ms

1397ms

1494ms

DisruptorBlockingWaitStrategy

345ms

363ms

357ms

DisruptorYieldingWaitStrategy

104ms

108ms

107ms

五千萬數據

第一次

第二次

第三次

ArrayBlcokingQueue

14799ms

14928ms

15122ms

LinkedBlockingQueue(無界)

14226ms

14008ms

13518ms

LinkedBlockingQueue(有界)

14039ms

14434ms

13839ms

DisruptorBlockingWaitStrategy

2972ms

2910ms

2848ms

DisruptorYieldingWaitStrategy

699ms

742ms

698ms


然後我對程序進行了修改,讓測試程序持續運行,每五千萬輸出一次,對運行期間CPU和內存使用情況進行了記錄

ArrayBlcokingQueue
技術分享圖片


LinkedBlockingQueue(無界)
技術分享圖片


LinkedBlockingQueue(有界)
技術分享圖片


Disruptor(BlockingWaitStrategy)

技術分享圖片
Disruptor(YieldingWaitStrategy)
技術分享圖片

從上面的測試中我們可以看到ArrayBlcokingQueue與LinkedBlockingQueue性能上區別不是很大,LinkedBlockingQueue由於讀寫鎖的分離,平均性能會稍微好些,但差距並不明顯。
而Disruptor
性能表現突出,特別是隨著數據量的增大,優勢會越發明顯。同時在單線程生產和消費的應用場景下,相比jdk內置的阻塞隊列,CPU和GC的壓力反而更小。

三、總結

1、ArrayBlcokingQueue與LinkedBlockingQueue,一般認為前者基於數組實現,初始化後不需要再創建新的對象,但沒有進行鎖分離,所以內存GC壓力較小,但性能會相對較低;後者基於鏈表實現,每次都需要創建 一個node對象,會存在頻繁的創建銷毀操作,GC壓力較大,但插入和刪除數據是不同的鎖,進行了鎖分離,性能會相對較好;從測試結果上看,其實兩者性能和GC上差別都不大,在實際運用過程中,我認為一般場景下ArrayBlcokingQueue的性能已經足夠應對,處於對GC壓力的考慮,及潛在的OOM的風險我建議普通情況下使用ArrayBlcokingQueue即可。當然你也可以使用LinkedBlockingQueue,從測試結果上看,它相比ArrayBlcokingQueue性能上有有所提升但並不明顯,結合gc的壓力和潛在OOM的風險,所以結合應用的場景需要綜合考慮。

2、Disruptor做為一款高性能隊列框架,確實足夠優秀,在測試中我們可以看到無論是性能和GC壓力都遠遠好過ArrayBlcokingQueue與LinkedBlockingQueue;如果你追求更高的性能,那麽Disruptor是一個很好的選擇。
但需要註意的是,你需要結合自己的硬件配置和業務場景,正確配置Disruptor,選擇合適的消費策略,這樣不僅可以獲取較高的性能,同時可以保證硬件資源的合理分配。

3、對這三種阻塞隊列的測試,並不是為了比較孰優孰劣,主要是為了加強理解,實際的業務應用需要根據情況合理進行選擇。這裏只是結合自己的使用,對它們進行一個簡單的總結,並沒有進行較深入的探究,如有錯誤的的地方還請指正與海涵。
 
 
 


 
 
 

ArrayBlcokingQueue,LinkedBlockingQueue與Disruptor三種隊列對比與分析