1. 程式人生 > >Java常見集合框架(十六):Queue之DelayQueue、PriorityQueue、PriorityBlockingQueue

Java常見集合框架(十六):Queue之DelayQueue、PriorityQueue、PriorityBlockingQueue

DelayQueue

public class DelayQueue extends AbstractQueue implements BlockingQueue

  1. Delayed 元素的一個基於優先順序的無界阻塞佇列,只有在延遲期滿時才能從中提取元素。
  2. 如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。
  3. 不允許使用 null 元素。

成員變數

    /**
     * 可重入的互斥鎖
     */
    private final transient ReentrantLock lock = new ReentrantLock();

    /**
     * 一個基於優先順序堆的無界優先順序佇列。可自然排序。不允許使用 null 元素。
     */
private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * 喚醒或等待take執行緒的條件 */ private final Condition available = lock.newCondition();

構造方法

    /**
     * 建立一個最初為空的新 DelayQueue。
     */
    public DelayQueue() {}

    /**
     * 建立一個最初包含 Delayed 例項的給定 collection 元素的 DelayQueue。
     */
public DelayQueue(Collection<? extends E> c) { this.addAll(c); }

常用方法

boolean add(E e):將指定元素插入此延遲佇列中。

    public boolean add(E e) {
        return offer(e);
    }

boolean offer(E e):將指定元素插入此延遲佇列。

    public boolean offer(E e) {
        final ReentrantLock lock
= this.lock; lock.lock();//阻塞式獲取鎖 try { E first = q.peek();//獲取但不移除此佇列的頭部;如果此佇列為空,則返回 null。 q.offer(e);//將指定的元素插入此優先順序佇列。 if (first == null || e.compareTo(first) < 0) available.signalAll();//佇列中無元素,喚醒take執行緒 return true; } finally { lock.unlock();//釋放鎖 } }

E peek():獲取但不移除此佇列的頭部;如果此佇列為空,則返回 null。

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();//阻塞式獲取鎖
        try {
            return q.peek();// 獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
        } finally {
            lock.unlock();//釋放鎖
        }
    }

E poll(): 獲取並移除此佇列的頭,如果此佇列不包含具有已到期延遲時間的元素,則返回 null。

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();//  獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
            //getDelay:返回與此物件相關的剩餘延遲時間,以給定的時間單位表示。
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;//佇列為空或元素未到期
            else {
                E x = q.poll();//獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
                assert x != null;//非空校驗
                if (q.size() != 0)//佇列中還有元素
                    available.signalAll();//喚醒take執行緒
                return x;
            }
        } finally {
            lock.unlock();
        }
    }

void put(E e):將指定元素插入此延遲佇列。

    public void put(E e) {
        offer(e);
    }

E take():獲取並移除此佇列的頭部,在可從此佇列獲得到期延遲的元素之前一直等待(如有必要)。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//阻塞式獲取鎖,可響應執行緒中斷
        try {
            for (;;) {
                E first = q.peek();//獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。
                if (first == null) {
                    available.await();//佇列為空,執行緒等待,釋放鎖
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);//獲取元素剩餘到期市場,並判斷是否到期
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);//元素還未到期,執行緒等待指定時間,釋放鎖
                    } else {
                        E x = q.poll();//獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
                        assert x != null;//非空校驗
                        if (q.size() != 0)
                            available.signalAll(); // 執行緒非空,喚醒其它所有take執行緒                        return x;//返回隊頭元素

                    }
                }
            }
        } finally {
            lock.unlock();//釋放鎖
        }
    }

由原始碼看出延遲佇列DelayQueue操作元素是通過PriorityQueue實現的,PriorityQueue是一個基於優先順序堆的無界優先順序佇列。利用可重入的互斥鎖ReentrantLock保證執行緒安全,同時利用Condition保證插入或獲取元素是阻塞的。

PriorityQueue

public class PriorityQueue extends AbstractQueue implements java.io.Serializable

  1. 一個基於優先順序堆的無界優先順序佇列。
  2. 優先順序佇列不允許使用 null 元素。
  3. 預設容量11,當元素容量小於64時,擴容double,否則擴容50%。
  4. 不是同步的。

成員變數

    /**
     * 初始容量
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 元素以平衡二叉樹形式儲存
     */
    private transient Object[] queue;

    /**
     * 優先順序佇列元素數
     */
    private int size = 0;

    /**
     * 元素自然排序方式
     */
    private final Comparator<? super E> comparator;

    /**
     * 優先順序佇列修改次數
     */
    private transient int modCount = 0;

構造方法

    /**
     *  使用預設的初始容量(11),並根據其自然順序對元素進行排序。
     */
    public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

     /**
     * 使用指定的初始容量建立Queue,並根據指定的比較器對元素進行排序。
     */
    public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        // Note: This restriction of at least one is not actually needed,
        // but continues for 1.5 compatibility
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }

常用方法

boolean add(E e):將指定的元素插入此優先順序佇列。

    public boolean add(E e) {
        return offer(e);
    }

boolean offer(E e): 將指定的元素插入此優先順序佇列。

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;//修改次數+1
        int i = size;
        if (i >= queue.length)
            grow(i + 1);//元素數可能不夠,需要擴容
        size = i + 1;
        if (i == 0)//佇列為空
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }
     /**
     * 佇列擴容
     */
    private void grow(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError();
    int oldCapacity = queue.length;//獲取當前元素數
        // <64 雙倍擴容; >=64 擴容 50%
        int newCapacity = ((oldCapacity < 64)?
                           ((oldCapacity + 1) * 2):
                           ((oldCapacity / 2) * 3));
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;//最大邊界
        if (newCapacity < minCapacity)
            newCapacity = minCapacity;
        queue = Arrays.copyOf(queue, newCapacity);//陣列複製
    }

    //選擇排序方式並插入相應位置
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

    /**
     * Comparable 方式排序
     */
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

     /**
     * Comparator 方式排序
     */
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

E peek(): 獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。

    public E peek() {
        if (size == 0)
            return null;
        return (E) queue[0];
    }

E poll() :獲取並移除此佇列的頭,如果此佇列為空,則返回 null。

    public E poll() {
        if (size == 0)
            return null;
        int s = --size;//更新元素數
        modCount++;//修改次數+1
        E result = (E) queue[0];//獲取佇列頭部元素
        E x = (E) queue[s];//獲取尾部元素
        queue[s] = null;//末尾置空
        if (s != 0)//佇列中還有元素
            siftDown(0, x);
        return result;
    }
    /**
     * 元素重新排序
     */
     private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }

    /**
     * Comparable方式重新排序
     */
    private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo((E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = key;
    }

    /**
     * Comparator方式重新排序
     */
    private void siftDownUsingComparator(int k, E x) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0)
                c = queue[child = right];
            if (comparator.compare(x, (E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = x;
    }

由原始碼看出,PriorityQueue是非執行緒安全的,利用Comparator或Comparable進行自然排序,從而實現有優先順序的佇列,以平衡二叉樹的形式儲存在transient Object[] queue中,雖然api中介紹說是無界佇列,但從原始碼看出其實是有邊界的 ,值為Integer.MAX_VALUE;只是邊界特別大,從某種程度上來說,可以理解為無邊界。

PriorityBlockingQueue

public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable

  1. 一個無界阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞獲取操作。
  2. 此佇列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致 OutOfMemoryError)。
  3. 不允許使用 null 元素。
  4. 不允許插入不可比較的物件。

成員變數

    //元素操作均基於PriorityQueue
    private final PriorityQueue<E> q;
    //可重入的互斥鎖,該處採用公平鎖
    private final ReentrantLock lock = new ReentrantLock(true);
    //take條件
    private final Condition notEmpty = lock.newCondition();

構造方法

    /**
     * 用預設的初始容量 (11) 建立一個 PriorityBlockingQueue,並根據元素的自然順序對其元素進行排序。
     */
    public PriorityBlockingQueue() {
        q = new PriorityQueue<E>();
    }

    /**
     * 使用指定的初始容量建立一個 PriorityBlockingQueue,並根據指定的比較器對其元素進行排序。
     */
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        q = new PriorityQueue<E>(initialCapacity, comparator);
    }

    /**
     * 使用指定的初始容量建立一個 PriorityBlockingQueue,並根據元素的自然順序對其元素進行排序。
     */
    public PriorityBlockingQueue(int initialCapacity) {
        q = new PriorityQueue<E>(initialCapacity, null);
    }

    /**
     * 建立一個包含指定 collection 元素的 PriorityBlockingQueue。
     */
    public PriorityBlockingQueue(Collection<? extends E> c) {
        q = new PriorityQueue<E>(c);
    }

常用方法

boolean add(E e):將指定元素插入此優先順序佇列。

    public boolean add(E e) {
        return offer(e);
    }

boolean offer(E e):將指定元素插入此優先順序佇列。

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();//阻塞式獲取鎖
        try {
            boolean ok = q.offer(e);//插入元素
            assert ok;
            notEmpty.signal();//喚醒take執行緒
            return true;
        } finally {
            lock.unlock();//釋放鎖
        }
    }

E peek():獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();//阻塞式獲取鎖
        try {
            return q.peek();//獲取元素
        } finally {
            lock.unlock();//釋放鎖
        }
    }

E poll():獲取並移除此佇列的頭,如果此佇列為空,則返回 null。

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();//阻塞式獲取鎖
        try {
            return q.poll();//獲取並移除此佇列的頭
        } finally {
            lock.unlock();//釋放鎖
        }
    }

E take():獲取並移除此佇列的頭部,在元素變得可用之前一直等待(如果有必要)。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//阻塞式獲取鎖,可響應執行緒中斷
        try {
            try {
                while (q.size() == 0)
                    notEmpty.await();//佇列中無元素,阻塞等待,釋放鎖
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = q.poll();//有元素,獲取並移除元素
            assert x != null;
            return x;//返回元素
        } finally {
            lock.unlock();//釋放鎖
        }
    }

void put(E e):將指定元素插入此優先順序佇列。

    public void put(E e) {
        offer(e); // never need to block
    }

有原始碼看出,PriorityBlockingQueue是基於PriorityQueue實現具有優先順序的無界阻塞佇列,利用ReentrantLock實現執行緒安全,Condition實現阻塞。

DelayQueue及PriorityBlockingQueue實現具有優先順序的無界阻塞佇列都是基於PriorityQueue的,區別在於DelayQueue加入了延遲概念。雖說都是無界,但最大邊界為:Integer.MAX_VALUE。