1. 程式人生 > >JAVA併發程式設計:阻塞佇列-DelayQueue

JAVA併發程式設計:阻塞佇列-DelayQueue

生活

如果第一次你沒有成功,那麼稱之為1.0版,繼續加油。

DelayQueue的成員組成

今天來學習延時佇列,這個玩意兒也是非常重要,在定時器上有用到。
首先簡單瞭解下,延時佇列就是讓指定的資料再指定的時候以後出隊,也就是按照時間排序,因此它的核心確實是使用了昨天看的優先佇列。
下面先來看看DelayQueue的成員

//可重入鎖
    private transient final ReentrantLock lock = new ReentrantLock();
    //優先佇列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //頭執行緒。如果不為空,說明之前的執行緒還沒等到資料,那後面的就繼續等著吧
    private Thread leader = null;
    //條件,是否允許,到達指定時間才允許
    private final Condition available = lock.newCondition();

DelayQueue建立

 public DelayQueue() {}

//這裡的addAll最後呼叫到offer,本質是遍歷集合裡的元素,
然後一個一個塞到優先佇列尾部,這樣效率不是不高嗎?
很奇怪,為啥沒有呼叫優先佇列直接傳入集合的構造器,
先組成一個數組,在headify..?

//另外需要注意的集合的元素必須實現Delay介面

 public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }


//可以看到這個介面的父介面是Comparable,這樣就滿足了塞入優先佇列的條件
public interface Delayed extends Comparable<Delayed> {
   long getDelay(TimeUnit unit);
}
    

DelayQueue入隊

入隊最終就呼叫了offer,不存在阻塞入隊,因為使用了優先佇列,
該佇列本身實現了擴容機制,不存在容量不足的情況。

 public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        //呼叫優先佇列入隊
            q.offer(e);
            //如果根節點就是e,說明當前這個節點就是最快要出隊的,喚醒等待的執行緒即可
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

DelayQueue出隊

非阻塞出隊:

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            //如果根節點是空或者還不到時間,就return null
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

阻塞出隊:

阻塞出隊,知道取到資料或者被中斷
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                //沒有資料就等待
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    //根節點時間到了就直接取資料出去
                    if (delay <= 0)
                        return q.poll();
                        //如果時間沒到且有執行緒在前面就等待
                    else if (leader != null)
                    /
                        available.await();
                    else {
                    //如果時間沒到,且前面沒有執行緒,就設定自己為leader,最近一個到時間的由我獲取
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                        //等待指定時間,喚醒後清空自己的leader去取資料
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
        //如果我已經清空leader並且後面還有資料,喚醒後面等待的執行緒
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }