1. 程式人生 > >Java源碼解析——集合框架(二)——ArrayBlockingQueue

Java源碼解析——集合框架(二)——ArrayBlockingQueue

not 結構 AS ray false 元素 模式 -- ins

ArrayBlockingQueue源碼解析

ArrayBlockingQueue是一個阻塞式的隊列,繼承自AbstractBlockingQueue,間接的實現了Queue接口和Collection接口。底層以數組的形式保存數據(實際上可看作一個循環數組)。常用的操作包括 add ,offer,put,remove,poll,take,peek。

一、類聲明

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable

1)AbstractQueue提供了Queue接口的默認實現。

2)BlockingQueue接口定義了阻塞隊列必須實現的方法。

3)通過實現 java.io.Serializable 接口以啟用其序列化功能。未實現此接口的類將無法使其任何狀態序列化或反序列化。序列化接口沒有方法或字段,僅用於標識可序列化的語義。

二、成員變量

private final E[] items;//底層數據結構

private int takeIndex;//用來為下一個take/poll/remove的索引(出隊)

private int putIndex;//用來為下一個put/offer/add的索引(入隊)
private int count;//隊列中元素的個數 private final ReentrantLock lock;// private final Condition notEmpty;//等待出隊的條件 private final Condition notFull;//等待入隊的條件

三、構造方法

ArrayBlockingQueue提供了兩個構造方法:

/**
* 創造一個隊列,指定隊列容量,指定模式
* @param fair
* true:先來的線程先操作
* false:順序隨機
*/
public ArrayBlockingQueue(int capacity, boolean
fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity];//初始化類變量數組items lock = new ReentrantLock(fair);//初始化類變量鎖lock notEmpty = lock.newCondition();//初始化類變量notEmpty Condition notFull = lock.newCondition();//初始化類變量notFull Condition } /** * 創造一個隊列,指定隊列容量,默認模式為非公平模式 * @param capacity <1會拋異常 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }

ArrayBlockingQueue的組成:一個對象數組+1把鎖ReentrantLock+2個條件Condition

三、成員方法

  • 入隊方法

  ArrayBlockingQueue的添加數據方法有add,put,offer這3個方法,總結如下:

  add方法內部調用offer方法,如果隊列滿了,拋出IllegalStateException異常,否則返回true

  offer方法如果隊列滿了,返回false,否則返回true

  add方法和offer方法不會阻塞線程,put方法如果隊列滿了會阻塞線程,直到有線程消費了隊列裏的數據才有可能被喚醒。

  這3個方法內部都會使用可重入鎖保證原子性。

1)add方法:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

2)offer方法:

在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false。因為使用的是ReentrantLock重入鎖,所以需要顯式地加鎖和釋放鎖。

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)//數組滿了
                return false;
            else {//數組沒滿
                insert(e);//插入一個元素
                return true;
            }
        } finally {
            lock.unlock();
        }
}

在插入元素結束後,喚醒等待notEmpty條件(即獲取元素)的線程。

/**
* 在隊尾插入一個元素,並設置了超時等待的時間
* 如果數組已滿,則進入等待,直到出現以下三種情況:
* 1、被喚醒
* 2、等待時間超時
* 3、當前線程被中斷
*/
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
  if (e == null)
    throw new NullPointerException();
  long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒
  final ReentrantLock lock = this.lock;
        /*
         * lockInterruptibly():
         * 1、 在當前線程沒有被中斷的情況下獲取鎖。
         * 2、如果獲取成功,方法結束。
         * 3、如果鎖無法獲取,當前線程被阻塞,直到下面情況發生:
         * 1)當前線程(被喚醒後)成功獲取鎖
         * 2)當前線程被其他線程中斷
         * 
         * lock()
         * 獲取鎖,如果鎖無法獲取,當前線程被阻塞,直到鎖可以獲取並獲取成功為止。
         */
  lock.lockInterruptibly();//加可中斷的鎖
  try {
    for (;;) {
      if (count != items.length) {//隊列未滿
        insert(e);
        return true;
      }
      if (nanos <= 0)//已超時
        return false;
      try {
        /*
        * 進行等待:
        * 在這個過程中可能發生三件事:
        * 1、被喚醒-->繼續當前這個for(;;)循環
        * 2、超時-->繼續當前這個for(;;)循環
        * 3、被中斷-->之後直接執行catch部分的代碼
        */
        nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,線程也可能被喚醒)
      } catch (InterruptedException ie) {//在等待的過程中線程被中斷
        notFull.signal(); // 喚醒其他未被中斷的線程
        throw ie;
      }
    }
  } finally {
    lock.unlock();
  }
}

無論是第一個offer方法還是第二個offer方法都調用了insert方法,insert方法的步驟是首先添加元素,然後利用inc函數進行索引的添加,最後會喚醒因為隊列中沒有數據而等待被阻塞的獲取數據的方法。

private void insert(E x) {
    items[putIndex] = x; // 元素添加到數組裏
    putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0
    ++count; // 元素個數+1
    notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時候隊列裏沒有數據,被阻塞。這個時候隊列insert了一條數據,需要調用signal進行通知
}

其中inc函數來改變索引的增加:

final int inc(int i) {
    return (++i == items.length) ? 0 : I;
}

3)put方法

/**
* 在隊尾插入一個元素
* 如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷
*/
public void put(E e) throws InterruptedException {
  if (e == null)
    throw new NullPointerException();
  final E[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    try {
      while (count == items.length)//隊列滿了,一直阻塞在這裏
        /*
        * 一直等待條件notFull,即被其他線程喚醒
        * (喚醒其實就是,有線程將一個元素出隊了,然後調用notFull.signal()喚醒其他等待這個條件的線程,同時隊列也不慢了)
         */
        notFull.await();
    } catch (InterruptedException ie) {//如果被中斷
      notFull.signal(); // 喚醒其他等待該條件(notFull,即入隊)的線程
      throw ie;
    }
    insert(e);
  } finally {
    lock.unlock();
  }
}

  • 出隊方法

ArrayBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。

ArrayBlockingQueue的刪除數據方法有poll,take,remove這3個方法,總結如下:

poll方法對於隊列為空的情況,返回null,否則返回隊列頭部元素。

remove方法取的元素是基於對象的下標值,刪除成功返回true,否則返回false。

poll方法和remove方法不會阻塞線程。

take方法對於隊列為空的情況,會阻塞並掛起當前線程,直到有數據加入到隊列中。

這3個方法內部都會調用notFull.signal方法通知正在等待隊列滿情況下的阻塞線程。

1)poll方法

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程
    try {
        return (count == 0) ? null : extract(); // 如果隊列裏沒元素了,返回null,否則調用extract方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用poll方法
    }
}

poll方法內部調用extract方法:

private E extract() {
  final E[] items = this.items;
  E x = items[takeIndex];//獲取出隊元素
  items[takeIndex] = null;//將出隊元素位置置空
  /*
  * 第一次出隊的元素takeIndex==0,第二次出隊的元素takeIndex==1
  * (註意:這裏出隊之後,並沒有將後面的數組元素向前移)
  */
  takeIndex = inc(takeIndex);
  --count;//數組元素個數-1
  notFull.signal();//數組已經不滿了,喚醒其他等待notFull條件的線程
  return x;//返回出隊的元素
}

同樣地notfull標誌表示數組已經不滿,可以執行被阻塞的入隊操作。

2)take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程
    try {
        while (count == 0) // 如果隊列空,阻塞當前線程,並加入到條件對象notEmpty的等待隊列裏
            notEmpty.await(); // 線程阻塞並被掛起,同時釋放鎖
        return extract(); // 調用extract方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用take方法
    }
}

3)remove方法

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素
            if (o.equals(items[i])) { // 兩個對象相等的話
                removeAt(i); // 調用removeAt方法
                return true; // 刪除成功,返回true
            }
        }
        return false; // 刪除成功,返回false
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用remove方法
    }
}

以及

void removeAt(int i) {
    final Object[] items = this.items;
    if (i == takeIndex) { // 如果要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,然後取索引+1即可
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else { // 如果要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
    --count; // 元素個數-1
    notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知 
}

Java源碼解析——集合框架(二)——ArrayBlockingQueue