1. 程式人生 > >ArrayBlockingQueue原始碼解析(2)

ArrayBlockingQueue原始碼解析(2)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


3.3、public void put(E e) throws InterruptedException

原理:

  • 在隊尾插入一個元素,如果佇列滿了,一直阻塞,直到陣列不滿了或者執行緒被中斷

使用方法:

        try {
            abq.put("hello1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 在隊尾插入一個元素
     * 如果佇列滿了,一直阻塞,直到陣列不滿了或者執行緒被中斷
     */
    public void put(E e) throws InterruptedException {
        if (e == null)
            throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)//佇列滿了,一直阻塞在這裡
                    /*
                     * 一直等待條件notFull,即被其他執行緒喚醒
                     * (喚醒其實就是,有執行緒將一個元素出隊了,然後呼叫notFull.signal()喚醒其他等待這個條件的執行緒,同時佇列也不慢了)
                     */
                    notFull.await();
            } catch (InterruptedException ie) {//如果被中斷
                notFull.signal(); // 喚醒其他等待該條件(notFull,即入隊)的執行緒
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }

 

4、出隊

4.1、public E poll()

原理:

  • 如果沒有元素,直接返回null;如果有元素,將隊頭元素置null,但是要注意隊頭是隨時變化的,並非一直是items[0]。

使用方法:

abq.poll();

原始碼:

    /**
     * 出隊
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)//如果沒有元素,直接返回null,而非丟擲異常
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 出隊
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];//獲取出隊元素
        items[takeIndex] = null;//將出隊元素位置置空
        /*
         * 第一次出隊的元素takeIndex==0,第二次出隊的元素takeIndex==1
         * (注意:這裡出隊之後,並沒有將後面的陣列元素向前移)
         */
        takeIndex = inc(takeIndex);
        --count;//陣列元素個數-1
        notFull.signal();//陣列已經不滿了,喚醒其他等待notFull條件的執行緒
        return x;//返回出隊的元素
    }

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 從對頭刪除一個元素,如果陣列不空,出隊;如果陣列已空且已經超時,返回null;如果陣列已空且時間未超時,則進入等待,直到出現以下三種情況:

    • 被喚醒

    • 等待時間超時

    • 當前執行緒被中斷

使用方法:

        try {
            abq.poll(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 從對頭刪除一個元素,
     * 如果陣列不空,出隊;
     * 如果陣列已空,判斷時間是否超時,如果已經超時,返回null
     * 如果陣列已空且時間未超時,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前執行緒被中斷
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);//將時間轉換為納秒
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != 0) {//陣列不空
                    E x = extract();//出隊
                    return x;
                }
                if (nanos <= 0)//時間超時
                    return null;
                try {
                    /*
                     * 進行等待:
                     * 在這個過程中可能發生三件事:
                     * 1、被喚醒-->繼續當前這個for(;;)迴圈
                     * 2、超時-->繼續當前這個for(;;)迴圈
                     * 3、被中斷-->之後直接執行catch部分的程式碼
                     */
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }

            }
        } finally {
            lock.unlock();
        }
    }

 

4.3、public E take() throws InterruptedException

原理:

  • 將隊頭元素出隊,如果佇列空了,一直阻塞,直到陣列不為空或者執行緒被中斷

使用方法:

        try {
            abq.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 將隊頭元素出隊
     * 如果佇列空了,一直阻塞,直到陣列不為空或者執行緒被中斷
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)//如果陣列為空,一直阻塞在這裡
                    /*
                     * 一直等待條件notEmpty,即被其他執行緒喚醒
                     * (喚醒其實就是,有執行緒將一個元素入隊了,然後呼叫notEmpty.signal()喚醒其他等待這個條件的執行緒,同時佇列也不空了)
                     */
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

  

總結:

1、具體入隊與出隊的原理圖:這裡只說一種情況,見下圖,途中深色部分表示已有元素,淺色部分沒有元素。


 

上面這種情況是怎麼形成的呢?當佇列滿了,這時候,隊頭元素為items[0]出隊了,就形成上邊的這種情況。

假設現在又要出隊了,則現在的隊頭元素是items[1],出隊後就形成下面的情形。


 

出隊後,對頭元素就是items[2]了,假設現在有一個元素將要入隊,根據inc方法,我們可以得知,他要插入到items[0]去,入隊了形成下圖:


以上就是整個入隊出隊的流程,inc方法上邊已經給出,這裡再貼一遍:

    /**
     * i+1,陣列下標+1
     * 注意:這裡這樣寫的原因。
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

 

2、三種入隊對比:

  • offer(E e):如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false-->不阻塞

  • put(E e):如果佇列滿了,一直阻塞,直到陣列不滿了或者執行緒被中斷-->阻塞

  • offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果陣列已滿,則進入等待,直到出現以下三種情況:-->阻塞

    • 被喚醒

    • 等待時間超時

    • 當前執行緒被中斷

 

3、三種出對對比:

  • poll():如果沒有元素,直接返回null;如果有元素,出隊

  • take():如果佇列空了,一直阻塞,直到陣列不為空或者執行緒被中斷-->阻塞

  • poll(long timeout, TimeUnit unit):如果陣列不空,出隊;如果陣列已空且已經超時,返回null;如果陣列已空且時間未超時,則進入等待,直到出現以下三種情況:

    • 被喚醒

    • 等待時間超時

    • 當前執行緒被中斷


免費領取驗證碼、內容安全、簡訊傳送、直播點播體驗包及雲伺服器等套餐

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 測試環境docker化—容器叢集編排實踐
【推薦】 JVM垃圾收集器(2)
【推薦】 為什麼我打的jar包沒有註解?