1. 程式人生 > >高併發第十三彈:J.U.C 佇列 SynchronousQueue.ArrayBlockingQueue.LinkedBlockingQueue.LinkedTransferQueue

高併發第十三彈:J.U.C 佇列 SynchronousQueue.ArrayBlockingQueue.LinkedBlockingQueue.LinkedTransferQueue

因為下一節會說執行緒池,要用執行緒池 那麼執行緒池有個很重要的引數 就是Queue的選擇

常用的佇列其實就兩種:

  先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。  後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。

常用queue的分類:

  ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。

  LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。

  PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。

  LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

  LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。

  SynchronousQueue:一個不儲存元素的阻塞佇列。

  DealyQueue:一個使用優先順序佇列實現的無界阻塞佇列。

這幾個queue都是

extends AbstractQueue<E>
    implements BlockingQueue<E> {

AbstractQueue<E>:優先佇列

AbstractQueue是 Java Collections Framework 的成員,是一個基於優先順序堆的極大

優先順序佇列。此佇列按照在構造時所指定的順序對元素排序,既可以根據元素的自然順序來指定排序,也可以根據 Comparator 來指定,這取決於使用哪種構造方法。優先順序佇列不允許 null 元素。依靠自然排序的優先順序佇列還不允許插入不可比較的物件。

AbstractQueue的add,remove,element方法分別基於offer,poll,peek的實現,但是當佇列為null時,丟擲異常,而不是返回false或null。offer,poll,peek,並沒有實現待子類擴充套件。清空,迴圈poll,直到為空。addAll為迴圈遍歷集合元素,add到佇列; 

總結:記住這是一個優先佇列即可

BlockingQueue:阻塞佇列

                                            

主要應用場景:生產者消費者模型,是執行緒安全的 

   多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)

下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:       如上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。   如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。     這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大讓我們一起來見識下它的常用方法:BlockingQueue的核心方法:放入資料:  offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,    則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)  offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中    加入BlockingQueue,則返回失敗。  put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷    直到BlockingQueue裡面有空間再繼續.獲取資料:  poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,    取不到時返回null;  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,    佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。  take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到    BlockingQueue有新的資料被加入;   drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),     通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。

BlockingQueue提供了四套方法,分別來進行插入、移除、檢查。每套方法在不能立刻執行時都有不同的反應。 

  • Throws Exceptions :如果不能立即執行就丟擲異常。
  • Special Value:如果不能立即執行就返回一個特殊的值。
  • Blocks:如果不能立即執行就阻塞
  • Times Out:如果不能立即執行就阻塞一段時間,如果過了設定時間還沒有被執行,則返回一個值

所以我們先來介紹以下具體子類

ArrayBlockingQueue :一個由陣列支援的有界佇列初始化時指定容量大小,一旦指定大小就不能再變.

基本結構

顧名思義 這是一個底層由陣列來儲存資料的

    /** The queued items */
    final Object[] items;

同時使用ReentrantLock 來確保併發安全的  

 /** Main lock guarding all access */
    final ReentrantLock lock;

構造方法

 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

構造方法實際是通Lock 來確定公平性的

ArrayBlockingQueue詳解具體方法

2. LinkedBlockingQueue 一個由連結串列結構組成的有界阻塞佇列。

實現了一個內部類

    /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

構造方法

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

大小配置可選,如果初始化時指定了大小,那麼它就是有邊界的。不指定就無邊界(最大整型值)。內部實現是連結串列,採用FIFO形式儲存資料。

詳細方法深入理解LinkedBlockingQueue

3.LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

LinkedBlockingDeque是雙向連結串列實現的雙向併發阻塞佇列。該阻塞佇列同時支援FIFO和FILO兩種操作方式,即可以從佇列的頭和尾同時操作(插入/刪除);並且,該阻塞佇列是支援執行緒安全。

此外,LinkedBlockingDeque還是可選容量的(防止過度膨脹),即可以指定佇列的容量。如果不指定,預設容量大小等於Integer.MAX_VALUE。

其實就多了一個可頭可尾的操作 

4. PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。

支援優先順序排序,那麼肯定需要排序的 所以 須是實現Comparable介面,佇列通過這個介面的compare方法確定物件的priority。當前和其他物件比較,如果compare方法返回負數,那麼在佇列裡面的優先順序就比較高.優先順序中傳入的實體物件

   比較規則:當前物件和其他物件做比較,當前優先順序大就返回-1,優先順序小就返回1

構造方法

add方法

   public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

比較有趣也就是擴容方法了

 /**
     * Tries to grow array to accommodate at least one more element
     * (but normally expand by about 50%), giving up (allowing retry)
     * on contention (which we expect to be rare). Call only while
     * holding lock.
     *
     * @param array the heap array
     * @param oldCap the length of the array
     */
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

其先放開了鎖,然後通過CAS設定allocationSpinLock來判斷哪個執行緒獲得了擴容許可權,如果沒搶到許可權就會讓出CPU使用權。最後還是要鎖住開始真正的擴容。擴容許可權爭取到了就是計算大小,分配陣列。暫不肯定為什麼這麼麻煩要分配陣列的時候釋放鎖,暫猜測這樣做效率會更高。

測試類

public class ObjectBean implements Comparable<ObjectBean> {

    private String name;

    private Integer age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "ObjectBean [name=" + name + ", age=" + age + "]";
    }

    @Override
    public int compareTo(ObjectBean o) {
        return this.age.compareTo(o.getAge());
    }
    

本人能力有限 這節寫的虎頭蛇尾.如果以後把資料結構學習一下再回來重寫吧