1. 程式人生 > >阻塞隊列(BlockingQueue)

阻塞隊列(BlockingQueue)

arr except class 範圍 shu AR 數據 rup move

阻塞隊列是 java.util.concurrent 包提供的一個類,該類提供了多線程中通過隊列實現安全高效的數據處理的功能。

所謂阻塞隊列,是在普通隊列基礎上實現了阻塞線程的功能:

  •   隊列為空時,獲取元素的線程阻塞,直到隊列變為非空。
  •   當隊列滿時,存儲元素的線程阻塞,直到隊列可用(非滿)。

以下是阻塞隊列實現阻塞線程的兩種常用場景:

技術分享圖片技術分享圖片

阻塞隊列提供的方法:

  插入方法:

    1. boolean add(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full。

    2. boolean offer(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,返回false。

    3. void put(E e):隊列沒有滿,則插入數據;隊列滿時,阻塞調用此方法線程,直到隊列有空閑空間時此線程進入就緒狀態。

    4. boolean offer(E e, long timeout, TimeUnit unit):隊列沒有滿,插入數據並返回true;隊列滿時,阻塞調用此方法線程,若指定等待的時間內還不能往隊列中插入數據,返回false。

  移除方法:

    1. E remove():隊列非空,則以FIFO原則移除數據,並返回該數據的值;隊列為空,拋出異常

java.util.NoSuchElementException。

    2. E poll():隊列非空,移除數據,並返回該數據的值;隊列為空,返回null。

    3. E take():隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法線程,直到隊列為非空時此線程進入就緒狀態。

    4. E poll(long timeout, TimeUnit unit):隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法線程,若指定等待的時間內隊列都沒有數據可取,返回null。

  檢查方法:

    1. E element():隊列非空,則返回隊首元素;隊列為空,拋出異常 java.util.NoSuchElementException。

    2. E peek():隊列非空,則返回隊首元素;隊列為空,返回null。

  獲取所有成員的方法:

    1. int drainTo(Collection<? super E> c):一次性從BlockingQueue獲取所有可用的數據對象存入集合中。

    2. int drainTo(Collection<? super E> c, int maxElements):從BlockingQueue獲取指定數據的個數的對象存入集合中。

JDK提供的阻塞隊列:

  1. ArrayBlockingQueue :一個由數組結構實現的有界阻塞隊列。

    ArrayBlockingQueue內部,維護了一個定長數組。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下對象內部采用非公平鎖,所謂公平鎖是指阻塞的所有生產者線程(或消費者線程),當隊列可用時,可以按照阻塞的先後順序訪問隊列,即先阻塞的生產者線程,可以先往隊列裏插入元素(先阻塞的消費者線程,可以先從隊列裏獲取元素)。通常情況下為了保證公平性會降低吞吐量。

  2. LinkedBlockingQueue :一個由鏈表結構實現的有界阻塞隊列。

    LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。按照先進先出的原則對元素進行排序。

  3. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。

    DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。使用場景:常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。

  4. PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。

    PriorityBlockingQueue是一個支持優先級的無界隊列。默認情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。

  5. SynchronousQueue:一個不存儲元素的阻塞隊列。

    SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。

  6. LinkedTransferQueue:一個由鏈表結構實現的無界阻塞隊列。

    LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其他阻塞隊列LinkedTransferQueue多了tryTransfer和transfer方法。

  7. LinkedBlockingDeque:一個由鏈表結構實現的雙向阻塞隊列。

    LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。

阻塞隊列常用場景是 “生產者—消費者” 模式,以下是一個生產者不斷生產隨機數據存入隊列,消費者不斷獲取的實例:

import java.util.Random;
import java.util.concurrent.*;

public class BlockingQueueTest1 {

    /**
     * 生產者
     */
    public static class Producer implements Runnable {

        /**
         * 阻塞隊列
         */
        private BlockingQueue<Integer> blockingQueue;

        /**
         * 判斷是否循環
         */
        private boolean isRunning = true;

        /**
         * 隨機數據範圍
         */
        private static final int RANGE_FOR_DATA = 1000;

        private Random random = new Random();

        public Producer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            while (isRunning) {
                try {
                    /** 生產出隨機數 */
                    int data = random.nextInt(RANGE_FOR_DATA);
                    System.out.println(Thread.currentThread().getName() + " 生產數據:" + data);
                    /** 將隨機數放入阻塞隊列 */
                    blockingQueue.put(data);
                    System.out.println(Thread.currentThread().getName() + " 插入隊列:" + data);
                    /** 進行隨機時間休眠 */
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    System.out.println("程序結束啦,我不用再等待阻塞隊列有空余位置了!");
                }
            }
        }

        /**
         * 終止生產線程
         */
        public void shutDown() {
            isRunning = false;
        }
    }

    /**
     * 消費者
     */
    public static class Consumer implements Runnable {

        /**
         * 阻塞隊列
         */
        private BlockingQueue<Integer> blockingQueue;

        /**
         * 判斷是否循環
         */
        private boolean isRunning = true;

        /**
         * 隨機數據範圍
         */
        private Random random = new Random();

        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            while (isRunning) {
                try {
                    /** 從阻塞隊列中獲取隨機數 */
                    int data = (int) blockingQueue.take();
                    System.out.println(Thread.currentThread().getName() + " 消費數據:" + data);
                    /** 進行隨機時間休眠 */
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    System.out.println("程序結束啦,我不用再等待阻塞隊列非空了!");
                }
            }
        }

        /**
         * 終止消費線程
         */
        public void shutDown() {
            isRunning = false;
        }
    }

    public static void main(String[] args) {
        /** 創建容量大小為5的阻塞隊列 */
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);
        /** 創建連接池 */
        ExecutorService pool = Executors.newCachedThreadPool();
        /** 創建生產線程,消費線程各5個 */
        Producer[] producers = new Producer[5];
        Consumer[] consumers = new Consumer[5];
        /** 實例化生產線程與消費線程並且執行線程 */
        for (int i = 0; i < producers.length; i++) {
            producers[i] = new Producer(blockingQueue);
            consumers[i] = new Consumer(blockingQueue);
            pool.execute(producers[i]);
            pool.execute(consumers[i]);
        }
        try {
            /** 等待5秒後進行手動中斷 */
            Thread.sleep(5 * 1000);
            for (int i = 0; i < producers.length; i++) {
                producers[i].shutDown();
                consumers[i].shutDown();
            }
            /** 其實提不提醒線程關閉都一個樣了,阻塞的線程,不會因為手動中斷而中斷的 */
            pool.shutdown();
            /** 等待2秒,若還有線程沒有關閉則強行中斷所有等待線程 */
            if (!pool.awaitTermination(2 * 1000, TimeUnit.MILLISECONDS)) {
                /** 超時的時候向線程池中所有的線程發出中斷 */
                pool.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

阻塞隊列(BlockingQueue)