1. 程式人生 > >深入理解阻塞佇列(二)——ArrayBlockingQueue原始碼分析

深入理解阻塞佇列(二)——ArrayBlockingQueue原始碼分析

深入理解阻塞佇列(一)——基本結構中,介紹了BlockingQueue這一介面的子類以及子介面。本文主要就其中的一個實現類:ArrayBlockingQueue進行原始碼分析,分析阻塞佇列的阻塞是如何實現的。

概述

ArrayBlockingQueue底層是使用一個數組實現佇列的,並且在構造ArrayBlockingQueue時需要指定容量,也就意味著底層陣列一旦建立了,容量就不能改變了,因此ArrayBlockingQueue是一個容量限制的阻塞佇列。因此,在佇列全滿時執行入隊將會阻塞,在佇列為空時出隊同樣將會阻塞。

原始碼分析

重要欄位

ArrayBlockingQueue的重要欄位有如下幾個:

 /** The queued items */
    final Object[] items;

/** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

上面程式碼中的items陣列就代表的是佇列,ReentrantLock和兩個Condition都是用於用於併發的,並且這幾個欄位都是final的,意味著ArrayBloackingQueue初始化時就必須完成賦值。
ArrayBlockingQueue中有幾個int型的欄位表示當前操作items陣列的索引,如下:

    //記錄下一個take、remove、peek的索引
    int takeIndex;

    //記錄下一個put、offer、add的索引
    int putIndex;

    //佇列中元素的個數
    int count;

構造方法

ArrayBlockingQueue一共有三個構造方法,如下:

    //只指定容量
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    //指定容量和ReentrantLock是否公平
    public
ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //將集合中的元素初始化佇列的元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }

從上面的程式碼可以看到,構造方法主要使用容量對items陣列完成初始化,fair引數用來構造一個公平的或不公平的ReentrantLock。對於不瞭解ReentrantLock的朋友,可以參考下面這篇文章: AbstractQueuedSynchronizer詳解(一)——分析ReentrantLock原始碼
另外一個構造方法就是使用集合中的元素初始化佇列中的元素。

put(E e)方法

put(E e)方法在佇列不滿的情況下,將會將元素新增到佇列尾部,如果佇列已滿,將會阻塞,直到佇列中有剩餘空間可以插入。該方法的實現如下:

 public void put(E e) throws InterruptedException {
        //檢查元素是否為null,如果是,丟擲NullPointerException
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lockInterruptibly();
        try {
            //如果佇列已滿,阻塞,等待佇列成為不滿狀態
            while (count == items.length)
                notFull.await();
            //將元素入隊
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

從上面程式碼可以看到幾點:
1. ArrayBlockingQueue不允許元素為null
2. ArrayBlockingQueue在佇列已滿時將會呼叫notFull的await()方法釋放鎖並處於阻塞狀態
3. 一旦ArrayBlockingQueue不為滿的狀態,就將元素入隊

下面首先看一下enqueue方法。

enqueue(E e)方法

enqueue()方法用於將元素插入到佇列中,由於有元素進入了佇列,所以就通知了為空的Condition,釋放了因佇列為空而阻塞的執行緒。程式碼如下:

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

從上面的程式碼可以看到,底層的陣列使用的迴圈插入的方式;當一旦插入一個元素後,將呼叫notEmpty.signal()。
當呼叫put()方法時,由於會首先對Lock加鎖,然後再執行插入,所以當很多執行緒一起插入時,是執行緒安全的;而一旦進入lock塊中,噹噹前佇列已滿時,該執行緒就會被阻塞,直到佇列不再為滿的時候,可以重新獲取到鎖執行插入;在插入之後,由於新加了一個元素,需要通知因為空而阻塞的執行緒,所以需要呼叫notEmpty的signal方法。

E take()方法

take()方法用於取走隊頭的元素,當佇列為空時將會阻塞,直到佇列中有元素可取走時將會被釋放。其實現如下:

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //首先加鎖
        lock.lockInterruptibly();
        try {
            //如果佇列為空,阻塞
            while (count == 0)
                notEmpty.await();
            //佇列不為空,呼叫dequeue()出隊
            return dequeue();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

從上面可以看到take()流程和put()流程類似,一旦獲得了鎖之後,如果佇列為空,那麼將阻塞;否則呼叫dequeue()出隊一個元素。
下面看一下dequeue()方法。

dequeu()方法

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //取走資料
        E x = (E) items[takeIndex];
        //置為null,以助gc
        items[takeIndex] = null;
        //迴圈取
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //通知因佇列滿而阻塞的執行緒
        notFull.signal();
        return x;
    }

程式碼中註釋已經說明了dequeu的流程。
下面兩個過程聯合起來看,如果佇列已滿,那麼呼叫put時,因為呼叫了notFull.await(),那麼那個執行緒將會放棄鎖進入到阻塞狀態,這時一個執行緒取走了一個數據,呼叫了notFull.signal(),這時上一個執行緒有可能就被釋放了然後重新獲得了鎖,呼叫了enqueue()方法將元素插入到佇列中;如果佇列為空,執行take(),那麼由於呼叫了notEmpty.await(),該執行緒將會被阻塞,這時另一個執行緒執行了put()方法插入了一個元素,然後呼叫了notEmpty.signal(),這時取走執行緒被釋放了重新獲取了鎖取走了資料。這基本就是ArrayBlockingQueue的阻塞實現原理。

總結

根據分析原始碼可知,ArrayBlockingQueue的併發阻塞是通過ReentrantLock和Condition來實現的,關於Condition可以參考下面這篇文章:java Condition原始碼分析
ArrayBlockingQueue內部只有一把鎖,意味著同一時刻只有一個執行緒能進行入隊或者出隊的操作。 下一篇文章將介紹LinkedBlockingQueue。