1. 程式人生 > >jdk 常用的queue

jdk 常用的queue

queue佇列,先進先出

1、優先順序佇列,元素有優先順序

public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable

佇列使用堆排序,二叉樹,使用陣列儲存資料,非執行緒安全

入隊,新節點一直跟自己的父節點比較,不復合比較條件,交換位置,直到根節點

 public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }

 private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }
出隊,取出根節點值,把最後一個節點放到根節點,然後比較根節點和左右兩個子節點,找到正確位置
 private void siftDownUsingComparator(int k, E x) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0)
                c = queue[child = right];
            if (comparator.compare(x, (E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = x;
    }

對應執行緒安全的PriorityBlockingQueue,使用ReentrantLock,在入隊出隊總量等方法加鎖,出隊無值時候加入Condition佇列

2、延時佇列,元素有優先順序和延時時間,出隊時,時間未到,執行緒等待

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> 
返回並移出佇列頭,如果元素沒有到過期時間,則等待
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)// 已經有一個執行緒先來取值了,則當前執行緒無期等待
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();//設定當前執行緒為最先來取值的執行緒,
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);//等待指定時間,到時間自動喚醒
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();//leader執行緒需要取完值後喚醒等待佇列上的第一個執行緒,使它去嘗試取新的佇列頭
            lock.unlock();
        }
    }


3、阻塞佇列BlockingQueue,即入隊時空間不足,執行緒等待有空間時繼續入隊,出隊時佇列為空,執行緒等待直到有元素

常用的有ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue

ArrayBlockingQueue,初始化有界陣列,陣列儲存

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

兩個Condition,分別用來等待入隊的執行緒,和出隊的執行緒,入隊後喚醒notEmpty,出隊後喚醒notFull

LinkedBlockingQueue,有界,或者預設Integer.MAX_VALUE,使用連結串列儲存節點,

 /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
使用了兩個ReentrantLock分別控制入隊出隊,原理基本和ArrayBlockingQueue一致,Executors.newFixedThreadPool使用此佇列

SynchronousQueue,沒有容量或者"只有一個長度",入隊的執行緒直接阻塞,直到有出隊的執行緒出現,雙方交換資料,有點類似握手

有公平非公平兩種模式,預設非公平,