1. 程式人生 > >LinkedBlockingQueue原始碼解析(1)

LinkedBlockingQueue原始碼解析(1)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


1、對於LinkedBlockingQueue需要掌握以下幾點

  • 建立

  • 入隊(新增元素)

  • 出隊(刪除元素)

2、建立

Node節點內部類與LinkedBlockingQueue的一些屬性

    static class Node<E> {
        E item;//節點封裝的資料
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */

        Node<E> next;//下一個節點
        Node(E x) { item = x; }
    }

    /** 指定連結串列容量  */
    private final int capacity;

    /** 當前的元素個數 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 連結串列頭節點 */
    private transient Node<E> head;

    /** 連結串列尾節點 */
    private transient Node<E> last;

    /** 出隊鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 出隊等待條件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入隊鎖 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入隊等待條件 */
    private final Condition notFull = putLock.newCondition();

2.1、public LinkedBlockingQueue(int capacity)

使用方法:

Queue<String> abq = new LinkedBlockingQueue<String>(1000);

原始碼:

    /**
     * 建立一個 LinkedBlockingQueue,容量為指定容量
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//初始化頭節點和尾節點,均為封裝了null資料的節點
    }

注意點:

  • LinkedBlockingQueue的組成一個連結串列+兩把鎖+兩個條件

 

2.2、public LinkedBlockingQueue()

使用方法:

Queue<String> abq = new LinkedBlockingQueue<String>();

原始碼:

    /**
     * 建立一個LinkedBlockingQueue,容量為整數最大值
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

注意點:預設容量為整數最大值,可以看做沒有容量限制

 

3、入隊:

3.1、public boolean offer(E e)

原理:

  • 在隊尾插入一個元素, 如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false

使用方法:

  • abq.offer("hello1");

原始碼:

    /**
     * 在隊尾插入一個元素, 容量沒滿,可以立即插入,返回true; 佇列滿了,直接返回false
     * 注:如果使用了限制了容量的佇列,這個方法比add()好,因為add()插入失敗就會丟擲異常
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final AtomicInteger count = this.count;// 獲取佇列中的元素個數
        if (count.get() == capacity)// 佇列滿了
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();// 獲取入隊鎖
        try {
            if (count.get() < capacity) {// 容量沒滿
                enqueue(e);// 入隊
                c = count.getAndIncrement();// 容量+1,返回舊值(注意)
                if (c + 1 < capacity)// 如果新增元素後的容量,還小於指定容量(說明在插入當前元素後,至少還可以再插一個元素)
                    notFull.signal();// 喚醒等待notFull條件的其中一個執行緒
            }
        } finally {
            putLock.unlock();// 釋放入隊鎖
        }
        if (c == 0)// 如果c==0,這是什麼情況?一開始如果是個空佇列,就會是這樣的值,要注意的是,上邊的c返回的是舊值
            signalNotEmpty();
        return c >= 0;
    }

    /**
     * 建立一個節點,並加入連結串列尾部
     * @param x
     */
    private void enqueue(E x) {
        /*
         * 封裝新節點,並賦給當前的最後一個節點的下一個節點,然後在將這個節點設為最後一個節點
         */
        last = last.next = new Node<E>(x);
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//獲取出隊鎖
        try {
            notEmpty.signal();//喚醒等待notEmpty條件的執行緒中的一個
        } finally {
            takeLock.unlock();//釋放出隊鎖
        }
    }

如果,入隊邏輯不懂,檢視最後總結部分入隊邏輯的圖,程式碼非常簡單,流程看註釋即可。

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在隊尾插入一個元素,,如果佇列已滿,則進入等待,直到出現以下三種情況:

    • 被喚醒

    • 等待時間超時

    • 當前執行緒被中斷

使用方法:

        try {
            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 在隊尾插入一個元素,,如果佇列已滿,則進入等待,直到出現以下三種情況: 
     * 1、被喚醒 
     * 2、等待時間超時 
     * 3、當前執行緒被中斷
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {

        if (e == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);// 轉換為納秒
        int c = -1;
        final ReentrantLock putLock = this.putLock;// 入隊鎖
        final AtomicInteger count = this.count;// 總數量
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {// 容量已滿
                if (nanos <= 0)// 已經超時
                    return false;
                /*
                 * 進行等待: 在這個過程中可能發生三件事: 
                 * 1、被喚醒-->繼續當前這個while迴圈
                 * 2、超時-->繼續當前這個while迴圈 
                 * 3、被中斷-->丟擲中斷異常InterruptedException
                 */
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);// 入隊
            c = count.getAndIncrement();// 入隊元素數量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

注意:

  • awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去檢視AQS的原始碼。


免費領取驗證碼、內容安全、簡訊傳送、直播點播體驗包及雲伺服器等套餐

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 異常測試之Socket網路異常
【推薦】 分散式儲存系統可靠性系列三:設計模式