1. 程式人生 > >JAVA併發程式設計: PriorityQueue -》阻塞佇列 PriorityBlockingQueue

JAVA併發程式設計: PriorityQueue -》阻塞佇列 PriorityBlockingQueue

生活

一旦一種新技術開始滾動碾壓道路,如果你不能成為壓路機的一部分,那麼你就只能成為道路的一部分。

PriorityQueue

阻塞佇列裡的PriorityBlockingQueue基於PriorityQueue,所以在研究PriorityBlockingQueue之前要先研究一下PriorityQueue,這是一個有優先順序概念的佇列,是有順序的,他的順序是通過內部的比較器實現。
他的內部維護了一個數組,但不是一個簡單的陣列,其實是通過一個數組,維護一個二叉堆的資料結構。
因此先把二叉堆搞清楚。

啥是二叉堆

研究二叉堆,有必要先把堆的概念搞清楚,注意:這裡指代的堆,並不是java開發中常說的堆疊的那個堆哦。
那麼,堆是一種怎麼樣的資料結構呢?
堆通常是一個可以看成一棵樹的陣列物件,有以下兩個特徵:
1、堆的某個節點,總是不大於或者不小於父節點
2、堆是一顆完全樹。

二叉堆是一種特殊的堆,二叉樹是一顆完全二叉樹或者近似於完全二叉樹。
二叉堆又有兩種,最大堆和最小堆。
最大堆:父節點總是大於等於任何一個子節點
最小堆:父節點總是小於等於任何一個子節點。

來圖解表示下二叉堆

在這裡插入圖片描述

上圖是一個二叉堆(完全二叉樹),他的特點是在N層被填滿之前,不會開始第N+1層的填充,並且填充的順序是從左往右。
二叉堆又可以用陣列來表示:
如下圖
在這裡插入圖片描述
通過上圖可以發現一個規律,使用陣列實現的二叉堆,位置N上的元素,其左孩子在2N+1處,其右孩子在2N+2處,根節點是0。

由於ProtityQueue是一個有優先順序概念的佇列,因此可以使用二叉堆來實現,佇列的入隊和出隊,也可以通過二叉堆來實現,對應到二叉堆就是他的上移和下移,下面來圖解一下 二叉堆的上移和下移。

這些圖都是盜來的。。自己繪畫水平太差了~~

二叉堆的上移

下面圖解描述一下如何向二叉堆新增一個元素:
在這裡插入圖片描述
圖1是一個二叉堆 最小堆(完全二叉樹)
圖2在二叉樹的最後插入一個節點2
在這裡插入圖片描述
圖3 由於圖2中2的父節點6比它大,所以2和6交換
圖4 由於2的父節點5比它,所以2和5交換,
此時又是一個標準的二叉堆。

二叉堆的下移

下面來看下如何把二叉堆中第一個元素移出,即優先佇列中的出隊操作。

在這裡插入圖片描述
圖1一個二叉堆
圖2準備出隊最小元素1,先把最後一個元素8所在的位置刪除
在這裡插入圖片描述
圖3 最小節點1下兩個孩子節點,取最小節點3交換
圖4 最小節點1下兩個孩子節點,去最小節點4交換
在這裡插入圖片描述
此時根元素1的最小孩子節點9比8還大,直接用8覆蓋1。1即被刪除了。

PriorityQueue的建立

//容器陣列
 transient Object[] queue;

//構造器很多,總的來說指定容量和比較器,如果沒有指定比較器,要求存入的元素必須實現了Compareable介面,如果沒有指定容量預設11
public PriorityQueue() {
//預設11
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

       public PriorityQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

   public PriorityQueue(Comparator<? super E> comparator) {
        this(DEFAULT_INITIAL_CAPACITY, comparator);
    }


    public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        // Note: This restriction of at least one is not actually needed,
        // but continues for 1.5 compatibility
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }
   @SuppressWarnings("unchecked")
    public PriorityQueue(Collection<? extends E> c) {
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            initElementsFromCollection(ss);
        }
        else if (c instanceof PriorityQueue<?>) {
            PriorityQueue<? extends E> pq = (PriorityQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            initFromPriorityQueue(pq);
        }
        else {
            this.comparator = null;
            initFromCollection(c);
        }
    }

    @SuppressWarnings("unchecked")
    public PriorityQueue(PriorityQueue<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();
        initFromPriorityQueue(c);
    }
    @SuppressWarnings("unchecked")
    public PriorityQueue(SortedSet<? extends E> c) {
        this.comparator = (Comparator<? super E>) c.comparator();
        initElementsFromCollection(c);
    }

PriorityQueue的入隊

public boolean offer(E e) {
//元素為空就報錯
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        //容量不夠就擴容
        if (i >= queue.length)
        //擴容
            grow(i + 1);
        size = i + 1;
        //當是第一個元素時,就不用去比較只需要放在根節點即可
        if (i == 0)
            queue[0] = e;
        else
        //上移
            siftUp(i, e);
        return true;
    }

//擴容
private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // Double size if small; else grow by 50%
        //如果原長度小於64,則設定新容量 原來+原來的+2
        //否則 原來+原來/2
        int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity);
    }
    //上移程式碼
    //根據是否有比較器選擇不一樣的方法
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

    @SuppressWarnings("unchecked")
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
        // 比較自己與父節點,比較不滿足條件就互換
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

    @SuppressWarnings("unchecked")
    private void siftUpUsingComparator(int k, E x) {
    //同上
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

PriorityQueue的出隊

 public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;

	//取出第一個元素return出去
        E result = (E) queue[0];
        //取出最後一個元素
        E x = (E) queue[s];
        //把最後一個元素的位置置空
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }
private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }

    @SuppressWarnings("unchecked")
    private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
     //之所以迴圈一半,是因為超過一半的 2n+1就超過這個陣列長度,也就是沒有子節點了。 
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = queue[child];
            int right = child + 1;

		//比較兩個孩子節點,取最小或者最大的交換
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo((E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        //把最後一個元素放置在對應的位置上
        queue[k] = key;
    }

    @SuppressWarnings("unchecked")
    private void siftDownUsingComparator(int k, E x) {
    //同上
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0)
                c = queue[child = right];
            if (comparator.compare(x, (E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = x;
    }

PriorityQueue的heapify

回到上面建立優先佇列的構造器中,有一些是直接通過一個集合來建立,這裡的方法是先把集合裡的元素轉成一個數組,然後通過heapify方法把這個陣列變成一個標準的二叉堆,本質的實現原理就是下移:

 private void heapify() {
        for (int i = (size >>> 1) - 1; i >= 0; i--)
            siftDown(i, (E) queue[i]);
    }

PriorityBlockingQueue

PriorityBlockingQueue無非就死在PriorityQueue的基礎上加上鎖和條件變數

 /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;

注意到條件變數僅有notEmpty,用以阻塞當佇列為空時的出隊執行緒。
為啥沒有notFull,因為本身有擴容的操作。所以不存在容量上限的情況。

案例程式碼


public class PBQTest {

    public static void main(String[] args) {
        PriorityBlockingQueue queue = new PriorityBlockingQueue();
        new Thread(new PBQ(queue,new Student("小明",20))).start();
        new Thread(new PBQ(queue,new Student("小紅",17))).start();
        new Thread(new PBQ(queue,new Student("小剛",25))).start();
        new Thread(new PBQ(queue,new Student("小慌",31))).start();
        new Thread(new PBQ(queue,null)).start();
        new Thread(new PBQ(queue,null)).start();
        new Thread(new PBQ(queue,null)).start();
        new Thread(new PBQ(queue,null)).start();

    }

    static class PBQ implements  Runnable{

        private PriorityBlockingQueue queue;
        private Student student;

        @Override
        public void run() {

            if(student == null){
                try {
                    Thread.sleep(Long.valueOf(new Random().nextInt(5000)));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            else{
                queue.offer(student);
            }
        }

        public PBQ(PriorityBlockingQueue queue, Student student) {
            this.queue = queue;
            this.student = student;
        }
    }

    static class Student implements  Comparable<Student>{

        @Override
        public int compareTo(Student o) {
            return this.age>o.age?1:0;
        }

        private String name;
        private int age;

        public Student(String name, int age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return String.format("name:%s,age:%s",name,age);
        }
    }
}
name:小紅,age:17
name:小明,age:20
name:小剛,age:25
name:小慌,age:31