1. 程式人生 > >多執行緒程式設計學習六(Java 中的阻塞佇列).

多執行緒程式設計學習六(Java 中的阻塞佇列).

介紹

阻塞佇列(BlockingQueue)是指當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿;當佇列空時,佇列會阻塞獲得元素的執行緒,直到佇列變非空。阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。

當執行緒 插入/獲取 動作由於佇列 滿/空 阻塞後,佇列也提供了一些機制去處理,或丟擲異常,或返回特殊值,或者執行緒一直等待...

方法/處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e, timeout, unit)
移除方法 remove(o) poll() take() poll(timeout, unit)
檢查方法 element() peek() — 不移除元素 不可用 不可用

tips: 如果是無界阻塞佇列,則 put 方法永遠不會被阻塞;offer 方法始終返回 true。

Java 中的阻塞佇列:

ArrayBlockingQueue

ArrayBlockingQueue 是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序,預設情況下不保證執行緒公平的訪問。

通過可重入的獨佔鎖 ReentrantLock 來控制併發,Condition 來實現阻塞。

public class ArrayBlockingQueueTest {

    /**
     * 1. 由於是有界阻塞佇列,需要設定初始大小
     * 2. 預設不保證阻塞執行緒的公平訪問,可設定公平性
     */
    private static ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(2, true);

    public static void main(String[] args) throws InterruptedException {

        Thread put = new Thread(() -> {
            // 3. 嘗試插入元素
            try {
                QUEUE.put("java");
                QUEUE.put("javaScript");
                // 4. 元素已滿,會阻塞執行緒
                QUEUE.put("c++");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        put.start();
        Thread take = new Thread(() -> {
            try {
                // 5. 獲取一個元素
                System.out.println(QUEUE.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        take.start();
        // 6 javaScript、c++
        System.out.println(QUEUE.take());
        System.out.println(QUEUE.take());
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 是一個用單向連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為 Integer.MAX_VALUE。此佇列按照先進先出的原則對元素進行排序。

和 ArrayBlockingQueue 一樣,採用 ReentrantLock 來控制併發,不同的是它使用了兩個獨佔鎖來控制消費和生產,通過 takeLock 和 putLock 兩個鎖來控制生產和消費,互不干擾,只要佇列未滿,生產執行緒可以一直生產;只要佇列不空,消費執行緒可以一直消費,不會相互因為獨佔鎖而阻塞。

tips:因為使用了雙鎖,避免併發計算不準確,使用了一個 AtomicInteger 變數統計元素總量。

LinkedBlockingDeque

LinkedBlockingDeque 是一個由雙向連結串列結構組成的有界阻塞佇列,可以從佇列的兩端插入和移出元素。它實現了BlockingDeque介面,多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以 First 單詞結尾的方法,表示插入、獲取或移除雙端佇列的第一個元素。以 Last 單詞結尾的方法,表示插入、獲取或移除雙端佇列的最後一個元素。

LinkedBlockingDeque 的 Node 實現多了指向前一個節點的變數 prev,以此實現雙向佇列。併發控制上和 ArrayBlockingQueue 類似,採用單個 ReentrantLock 來控制併發。因為雙端佇列頭尾都可以消費和生產,所以使用了一個共享鎖。

雙向阻塞佇列可以運用在“工作竊取”模式中。

public class LinkedBlockingDequeTest {

    private static LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2);

    public static void main(String[] args) {
        DEQUE.addFirst("java");
        DEQUE.addFirst("c++");
        // java
        System.out.println(DEQUE.peekLast());
        // java
        System.out.println(DEQUE.pollLast());
        DEQUE.addLast("php");
        // c++
        System.out.println(DEQUE.pollFirst());
    }
}

tips: take() 方法呼叫的是 takeFirst(),使用時候需注意。

PriorityBlockingQueue

PriorityBlockingQueue 是一個底層由陣列實現的無界阻塞佇列,並帶有排序功能。由於是無界佇列,所以插入永遠不會被阻塞。預設情況下元素採取自然順序升序排列。也可以自定義類實現 compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造引數 Comparator 來對元素進行排序。

底層同樣採用 ReentrantLock 來控制併發,由於只有獲取會阻塞,所以只採用一個Condition(只通知消費)來實現。

public class PriorityBlockingQueueTest {

    private static PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>();

    public static void main(String[] args) {
        QUEUE.add("java");
        QUEUE.add("javaScript");
        QUEUE.add("c++");
        QUEUE.add("python");
        QUEUE.add("php");
        Iterator<String> it = QUEUE.iterator();
        while (it.hasNext()) {
            // c++  javaScript  java  python  php
            // 同優先順序不保證排序順序
            System.out.print(it.next() + "  ");
        }
    }
}

DelayQueue

DelayQueue 是一個支援延時獲取元素的無界阻塞佇列。佇列使用 PriorityQueue 來實現。佇列中的元素必須實現 Delayed 介面,元素按延遲優先順序排序,延遲時間短的排在前面,只有在延遲期滿時才能從佇列中提取元素。

和 PriorityBlockingQueue 相似,底層也是陣列,採用一個 ReentrantLock 來控制併發。

應用場景:

  1. 快取系統的設計:可以用 DelayQueue 儲存快取元素的有效期,使用一個執行緒迴圈查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示快取有效期到了。
  2. 定時任務排程:使用 DelayQueue 儲存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,比如 TimerQueue 就是使用 DelayQueue 實現的。
public class DelayElement implements Delayed, Runnable {

    private static final AtomicLong SEQUENCER = new AtomicLong();
    /**
     * 標識元素先後順序
     */
    private final long sequenceNumber;
    /**
     * 延遲時間,單位納秒
     */
    private long time;

    public DelayElement(long time) {
        this.time = System.nanoTime() + time;
        this.sequenceNumber = SEQUENCER.getAndIncrement();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - System.nanoTime(), NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        // compare zero if same object
        if (other == this) {
            return 0;
        }
        if (other instanceof DelayElement) {
            DelayElement x = (DelayElement) other;
            long diff = time - x.time;
            if (diff < 0) {
                return -1;
            } else if (diff > 0) {
                return 1;
            } else if (sequenceNumber < x.sequenceNumber) {
                return -1;
            } else {
                return 1;
            }
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    @Override
    public void run() {
        System.out.println("sequenceNumber" + sequenceNumber);
    }

    @Override
    public String toString() {
        return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ", time=" + time + '}';
    }
}
public class DelayQueueTest {

    private static DelayQueue<DelayElement> QUEUE = new DelayQueue<>();

    public static void main(String[] args) {
        // 1. 新增 10 個引數
        for (int i = 1; i < 10; i++) {
            // 2. 5 秒內隨機延遲
            int nextInt = new Random().nextInt(5);
            long convert = TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS);
            QUEUE.offer(new DelayElement(convert));
        }
        // 3. 查詢元素排序 —— 延遲短的排在前面
        Iterator<DelayElement> iterator = QUEUE.iterator();
        while (iterator.hasNext()) {
            System.out.println(iterator.next());
        }
        // 4. 可觀察到元素延遲輸出
        while (!QUEUE.isEmpty()) {
            Thread thread = new Thread(QUEUE.poll());
            thread.start();
        }
    }
}

LinkedTransferQueue

LinkedTransferQueue是一個由連結串列結構組成的無界阻塞TransferQueue佇列。

併發控制上採用了大量的 CAS 操作,沒有使用鎖。

相對於其他阻塞佇列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

  1. transfer : Transfers the element to a consumer, waiting if necessary to do so. 存入的元素必須等到有消費者消費才返回。
  2. tryTransfer:Transfers the element to a waiting consumer immediately, if possible. 如果有消費者正在等待消費元素,則把傳入的元素傳給消費者。否則立即返回 false,不用等到消費。

SynchronousQueue

SynchronousQueue 是一個不儲存元素的阻塞佇列。每一個 put 操作必須等待一個 take 操作,否則繼續 put 操作會被阻塞。

SynchronousQueue 預設情況下執行緒採用非公平性策略訪問佇列,未使用鎖,全部通過 CAS 操作來實現併發,吞吐量非常高,高於 LinkedBlockingQueue 和 ArrayBlockingQueue,非常適合用來處理一些高效的傳遞性場景。Executors.newCachedThreadPool() 就使用了 SynchronousQueue 進行任務傳遞。

public class SynchronousQueueTest {

    private static class SynchronousQueueProducer implements Runnable {

        private BlockingQueue<String> blockingQueue;

        private SynchronousQueueProducer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println(Thread.currentThread().getName() + " Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private static class SynchronousQueueConsumer implements Runnable {

        private BlockingQueue<String> blockingQueue;

        private SynchronousQueueConsumer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println(Thread.currentThread().getName() + " take(): " + blockingQueue.take());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {

        final BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
        new Thread(queueProducer, "producer - 1").start();
        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
        new Thread(queueConsumer1, "consumer — 1").start();
        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
        new Thread(queueConsumer2, "consumer — 2").start();
    }
}

 
 

  1. 參考書籍:《Java 併發程式設計的藝術》
  2. 參考博文:https://www.cnblogs.com/konck/p/9473677.html