ArrayBlockingQueue原始碼閱讀(1.8)
阿新 • • 發佈:2018-11-24
ArrayBlockingQueue原始碼閱讀
1、ArrayBlockingQueue類結構
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue介面的一種實現,要了解它就必須清楚BlockingQueue的相關知識;
2、BlockingQueue介面介紹
在併發佇列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高效能佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue介面!,BlockingQueue的類繼承關係如下:
BlockingQueue介面重要方法如下:
- offer(anObject): 表示如果可能的話, 將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納, 則返回true, 否則返回false.(本方法不阻塞當前執行方法的執行緒)。
- offer(E o, long timeout, TimeUnit unit), 可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
- put(anObject): 把anObject加到BlockingQueue裡, 如果BlockQueue沒有空間, 則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續。
- poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗,如果不指定超時時間,在沒有資料時立即返回失敗。
- take(): 取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入。
- drainTo(): 一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
3、原始碼分析
3.1、類屬性檢視
/** The queued items */ 以陣列作為資料結構
final Object[] items;
/** items index for next take, poll, peek or remove */ 佇列中下一個將被取出值的下標
int takeIndex;
/** items index for next put, offer, or add */ 佇列中下一個將被放入值的下標
int putIndex;
/** Number of elements in the queue */ 陣列元素數量
int count;
/*
* Concurrency control uses the classic two-condition algorithm 使用雙條件演算法
* found in any textbook.
*/
/** Main lock guarding all access */ 使用重入鎖(獨佔鎖)
final ReentrantLock lock;
/** Condition for waiting takes */ take時候用於等待的條件
private final Condition notEmpty;
/** Condition for waiting puts */ put時候用於等待的條件
private final Condition notFull;
3.2、建構函式分析
/**
- Creates an {@code ArrayBlockingQueue} with the given (fixed)
- capacity and default access policy.
- @param capacity the capacity of this queue
-
@throws IllegalArgumentException if {@code capacity < 1}
*/public ArrayBlockingQueue(int capacity) { this(capacity, false); //呼叫public ArrayBlockingQueue(int capacity, boolean fair)構造方法,預設使用非公平鎖 }
/**
- Creates an {@code ArrayBlockingQueue} with the given (fixed)
- capacity and the specified access policy.
- @param capacity the capacity of this queue
- @param fair if {@code true} then queue accesses for threads blocked
- on insertion or removal, are processed in FIFO order; //如果傳入的值為true即公平鎖,則需要維護一個有序佇列,保證先進先出的原則
- if {@code false} the access order is unspecified.
- @throws IllegalArgumentException if {@code capacity < 1}
*/public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //建立指定容量的陣列 lock = new ReentrantLock(fair); //預設使用非公平鎖 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/**
- Creates an {@code ArrayBlockingQueue} with the given (fixed)
- capacity, the specified access policy and initially containing the
- elements of the given collection,
- added in traversal order of the collection's iterator.
- @param capacity the capacity of this queue
- @param fair if {@code true} then queue accesses for threads blocked
- on insertion or removal, are processed in FIFO order;
- if {@code false} the access order is unspecified.
- @param c the collection of elements to initially contain 使用指定集合初始化佇列
- @throws IllegalArgumentException if {@code capacity} is less than
- {@code c.size()}, or less than 1.
- @throws NullPointerException if the specified collection or any
- of its elements are null
*/
//這個建構函式的核心就是c.size()與capacity的大小關係對比了
//如果c.size()>capacity那就會報錯,所以在初始化的時候要注意
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair); //先建立指定容量的陣列,以便集合中的元素存放
//這種寫法我們很常見,使用final表示引用不能改變,但又避免了直接使用成員變數
final ReentrantLock lock = this.lock;
//對佇列直接修改操作,需要先獲取獨佔鎖
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e; //下標從0開始存放
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i; //將陣列元素個數返回給全域性變數
putIndex = (i == capacity) ? 0 : i; //修改下一次將被放入值的下標
} finally {
lock.unlock(); //解鎖
}
}