Java 進階——多執行緒優化之執行緒池 ThreadPoolExecutor的核心容器阻塞佇列詳解(一)
#引言 多執行緒我想無論是後端開發,還是對於App開發者來說都不會陌生,何況Android強制要求不能在主執行緒中做網路請求,於是乎,在很多初學者或者App的原始碼中會出現會多的new Thread…的方式,這樣的程式碼是不優雅而且存在很多的隱患,假如說在使用者退出App之後,如果執行緒內的工作還未執行完畢此時是無法被回收的,更不必說如果是無限迴圈的執行緒,那麼可能永遠無法回收,永遠佔著記憶體和CPU資源,這是多麼大的浪費,而且專案龐大了還不好維護,所以如果你還是每次new Thread的話,我建議你閱讀下這篇文章或者學習下ThreadPoolExecutor執行緒池,瞭解一哈,在進入執行緒池的使用之前,先介紹下執行緒池框架中的核心容器——阻塞佇列,由於這兩篇文章不僅僅是簡單講解如何使用,會涉及到背後的一些設計思想和原始碼分析,理論知識會比較多,部分內容整理摘自《Java 併發程式設計的藝術》和JDK文件。
#一、執行緒池中的阻塞佇列模型 在我們學習執行緒池種類之前,先得了解一下阻塞佇列模型的相關知識。阻塞佇列是一個支援兩個附加操作的佇列——在佇列為空時,獲取元素的執行緒會等待佇列變為非空;而當佇列滿時,儲存元素的執行緒會等待佇列可用。在多執行緒程式設計過程中為了業務解耦和架構設計,經常會使用併發容器用於儲存多執行緒間的共享資料,這樣不僅可以保證執行緒安全,還可以簡化各個執行緒操作。前面我們在介紹執行緒池的時候說過執行緒池機制其實本質上就是**“生產者–消費者”模型(阻塞佇列常在“生產者–消費者”模型中充當容器角色,生產者是往阻塞佇列裡新增元素的執行緒,消費者是從阻塞佇列裡拿元素的執行緒),那麼
- ArrayBlockingQueue ——一個由陣列結構組成的有界阻塞佇列。
- LinkedBlockingQueue ——一個由連結串列結構組成的有界阻塞佇列。
- PriorityBlockingQueue ——一個支援優先順序排序的無界阻塞佇列。
- DelayQueue——一個使用優先順序佇列實現的無界阻塞佇列。
- SynchronousQueue——一個**“不儲存”**元素的阻塞佇列。
- LinkedTransferQueue——一個由連結串列結構組成的無界阻塞佇列。
- LinkedBlockingDeque——一個由連結串列結構組成的雙向阻塞佇列。
#二、BlockingQueue 主要四種的“異常”處理邏輯 通過上面BlockingQueue 主要原始碼部分,可以知道噹噹前佇列操作(新增/讀取)不可達時,BlockingQueue 通常會針對不同種類的操作採取不同的處理措施:
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
* <p>
* Usage example, based on a typical producer-consumer scenario.
* Note that a {@code BlockingQueue} can safely be used with multiple
* producers and multiple consumers.
* <pre> {@code
* class Producer implements Runnable {
* private final BlockingQueue queue;
* Producer(BlockingQueue q) { queue = q; }
* public void run() {
* try {
* while (true) { queue.put(produce()); }
* } catch (InterruptedException ex) { ... handle ...}
* }
* Object produce() { ... }
* }
*
* class Consumer implements Runnable {
* private final BlockingQueue queue;
* Consumer(BlockingQueue q) { queue = q; }
* public void run() {
* try {
* while (true) { consume(queue.take()); }
* } catch (InterruptedException ex) { ... handle ...}
* }
* void consume(Object x) { ... }
* }
*
* class Setup {
* void main() {
* BlockingQueue q = new SomeQueueImplementation();
* Producer p = new Producer(q);
* Consumer c1 = new Consumer(q);
* Consumer c2 = new Consumer(q);
* new Thread(p).start();
* new Thread(c1).start();
* new Thread(c2).start();
* }
* }}</pre>
*/
public interface BlockingQueue<E> extends Queue<E> {
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and throwing an
* {@code IllegalStateException} if no space is currently available.
* When using a capacity-restricted queue, it is generally preferable to
* use {@link #offer(Object) offer}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and {@code false} if no space is currently
* available. When using a capacity-restricted queue, this method is
* generally preferable to {@link #add}, which can fail to insert an
* element only by throwing an exception.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e);
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="../Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="../Collection.html#optional-restrictions">optional</a>)
*/
boolean remove(Object o);
}
|方法\處理方式| 丟擲異常 |返回特殊值 |一直阻塞| 超時退出 |-----|------|-------------|—| |插入方法| add(e) |offer(e)| put(e)| offer(e,time,unit) |移除方法| remove()| poll() |take()| poll(time,unit) |檢查方法| element()| peek() |不可用 |不可用
-
當執行**add(), remove(), element()**不可達時,丟擲異常 (指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常;當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 )
-
當執行offer(), poll(), peek(),返回特殊值(插入方法會返回布林值;移除方法則是從佇列裡拿出一個元素,如果沒有則返回null)
-
當執行**put(), take()**不可達時,一直阻塞當前執行緒,直到操作可以進行 (當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。)
-
當執行**offer, poll()**不可達時,阻塞一段時間,超時後退出 (當阻塞佇列滿\為空時,佇列會阻塞生產者\消費者執行緒一段時間,如果超過一定的時間,執行緒就會退出)
總之,BlockingQueue 中不允許有 null 元素,因此在 add(), offer(), put() 時如果引數是 null,會丟擲空指標,null 是用來有異常情況時做返回值的。
**注意:如果是無界阻塞佇列,佇列不可能會出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer方法時,該方法永遠返回true。。
#三、ArrayBlockingQueue ArrayBlockingQueue 是一個內部使用陣列實現的有界佇列,一旦建立後,容量不可變(因為陣列不可變長)。佇列中的元素按 FIFO 原則進行排序,每次佇列頭部讀取元素,並插入元素到尾部。預設ArrayBlockingQueue 不保證執行緒公平的訪問佇列(公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素)但是也正因為不公平所以從一定程度上提高了吞吐量(非公平性是對先等待的執行緒是非公平的,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格,有可能先阻塞的執行緒最後才訪問佇列)。
**注意:只是預設是不保證執行緒公平的訪問佇列並非不能支援,當然也可以通過對應的構造方法new ArrayBlockingQueue(100,true)來構造公平的阻塞佇列。
package java.util.concurrent;
import java.lang.ref.WeakReference;
import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* A bounded {@linkplain BlockingQueue blocking queue} backed by an
* array. This queue orders elements FIFO (first-in-first-out). The
* <em>head</em> of the queue is that element that has been on the
* queue the longest time. The <em>tail</em> of the queue is that
* element that has been on the queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
*
* <p>This is a classic "bounded buffer", in which a
* fixed-sized array holds elements inserted by producers and
* extracted by consumers. Once created, the capacity cannot be
* changed. Attempts to {@code put} an element into a full queue
* will result in the operation blocking; attempts to {@code take} an
* element from an empty queue will similarly block.
*
* <p>This class supports an optional fairness policy for ordering
* waiting producer and consumer threads. By default, this ordering
* is not guaranteed. However, a queue constructed with fairness set
* to {@code true} grants threads access in FIFO order. Fairness
* generally decreases throughput but reduces variability and avoids
* starvation.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
* @param <E> the type of elements held in this queue
*/
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** The queued items 儲存佇列元素的陣列*/
final Object[] items;
/** items index for next take, poll, peek or remove 下一次讀取元素的索引 */
int takeIndex;
/** items index for next put, offer, or add 下一次插入元素的索引*/
int putIndex;
/** Number of elements in the queue 當前佇列中元素總數*/
int count;
/** Main lock guarding all access 所有操作的重入鎖,實現的訪問公平性,兩個 Condition 保證了插入和讀取元素的併發控制*/
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Circularly decrements array index i.
*/
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
/**
* Returns item at index i.
*/
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
/**
* Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
* 根據指定的容量建立一個Object陣列,並初始化重入鎖和兩個併發控制條件
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and throwing an
* {@code IllegalStateException} if this queue is full.
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
* 如果沒有超過佇列的容量,直接在隊尾處插入指定元素e,add(E) 呼叫了父類的方法,而父類裡沒有實現 offer(E),本質上還是呼叫自身的offer(E),如果返回 false 就丟擲異常。
*/
public boolean add(E e) {
return super.add(e);
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.Call only when holding lock.
* enqueue(E) 方法會將元素新增到陣列佇列尾部,如果新增元素後佇列滿了,就修改 putIndex 為 0 ,新增後呼叫 notEmpty.signal() 通知喚醒阻塞在獲取元素的執行緒
*/
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
* put() 方法可以響應中斷,當佇列滿了,就呼叫 notFull.await() 阻塞等待,等有消費者獲取元素後繼續執行; 可以新增時還是呼叫 enqueue(E)。
*/
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//take() 方法可以響應中斷,與 poll() 不同的是,如果佇列中沒有資料會一直阻塞等待,直到中斷或者有元素,有元素時還是呼叫 dequeue() 方法。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//直接返回陣列中隊尾的元素,並不會刪除元素。如果佇列中沒有元素返回的是 null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
/**
* Returns the number of elements in this queue.
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* <p>Removal of interior elements in circular array based queues
* is an intrinsically slow and disruptive operation, so should
* be undertaken only in exceptional circumstances, ideally
* only when the queue is known not to be accessible by other
* threads.
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final Object[] items = this.items;
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length) i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final Object[] items = this.items;
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length) i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
/**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Object[] items = this.items;
final int end = takeIndex + count;
final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
if (end != putIndex)
System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
return a;
} finally {
lock.unlock();
}
}
/**
* Returns an array containing all of the elements in this queue, in
* proper sequence; the runtime type of the returned array is that of
* the specified array. If the queue fits in the specified array, it
* is returned therein. Otherwise, a new array is allocated with the
* runtime type of the specified array and the size of this queue.
*
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
* allocated array of {@code String}:
*
* <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Object[] items = this.items;
final int count = this.count;
final int firstLeg = Math.min(items.length - takeIndex, count);
if (a.length < count) {
a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
a.getClass());
} else {
System.arraycopy(items, takeIndex, a, 0, firstLeg);
if (a.length > count)
a[count] = null;
}
if (firstLeg < count)
System.arraycopy(items, 0, a, firstLeg, putIndex);
return a;
} finally {
lock.unlock();
}
}
public String toString() {
return Helpers.collectionToString(this);
}
/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final Object[] items = this.items;
final int putIndex = this.putIndex;
int i = takeIndex;
do {
items[i] = null;
if (++i == items.length) i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator() {
return new Itr();//itrs 是 ArrayBlockingQueue 的內部類 Itrs 的物件,它的作用是保證迴圈陣列迭代時的正確性,具體實現比較複雜,這裡暫不介紹
}
部分程式碼略...
}
ArrayBlockingQueue的核心操作在此我們只關注佇列的建立、插入元素和獲取元素操作。 ##1、建立ArrayBlockingQueue 通過構造方法即可建立對應的例項。
方法 | 說明 |
---|---|
ArrayBlockingQueue(int capacity, boolean fair) | 建立指定容量的陣列,建立指定現場訪問策略的重入鎖和Condition |
ArrayBlockingQueue(int capacity) | 預設的建構函式只指定了佇列的容量並設定為非公平的執行緒訪問策略 |
**ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) ** | 建立指定容量的陣列,建立指定現場訪問策略的重入鎖和Condition 並插入指定的元素 |
##2、向佇列中插入元素 在ArrayBlockingQueue中實現插入元素,有多個方法,本質上還是呼叫enqueue方法。
方法 | 說明 |
---|---|
add(E e) | 如果沒有超過佇列的容量,直接在隊尾處插入指定元素e,add(E) 呼叫了父類的方法,而父類裡沒有實現 offer(E),本質上還是呼叫自身的offer(E),如果返回 false 就丟擲異常。 |
offer(E e) | 先申請鎖,拿到之後如果立即將e插入佇列沒有超過最大容量,則呼叫enqueue將e插入佇列尾部,如果佇列已滿返回false |
offer(E,long,TimeUnit) | 與offer(E e)大體上功能類似,區別在於可以設定等待超時時間,若已超過還不能有位置則返回 false;否則呼叫 enqueue(E),然後返回 true。 |
enqueue(E x) | 如果新增元素後佇列滿了,就修改 putIndex 為 0 ;反之直接新增到陣列佇列尾部並呼叫 notEmpty.signal() 通知喚醒阻塞在獲取元素的執行緒 |
put() | 功能和offer型別,put() 方法可以響應中斷,當佇列滿了,就呼叫 notFull.await() 阻塞等待,等有消費者獲取元素後繼續執行 |
##3、獲取佇列中的元素 在ArrayBlockingQueue中實現獲取元素,有多個方法,本質上還是呼叫dequeue方法。
方法 | 說明 |
---|---|
E peek() | 直接返回陣列中隊尾的元素,並不會刪除元素。如果佇列中沒有元素返回的是 null |
E poll() | 選申請鎖,拿到鎖之後,如果在佇列中沒有元素時會立即返回 null;如果有元素呼叫 dequeue()返回 |
E poll(long timeout, TimeUnit unit)(E e) | 與offer(E e)大體上功能類似,區別在於可以允許阻塞一段時間,如果在阻塞一段時間還沒有元素進來,就返回 null |
take() | 與poll(E e)大體上功能類似,take() 方法可以響應中斷,如果佇列中沒有資料會一直阻塞等待,直到中斷或者有元素,有元素時還是呼叫 dequeue() 方法。 |
E dequeue() | 從隊首移除元素(即 takeIndex 位置)移除後會向後移動 takeIndex,如果已經到隊尾,就歸零,其實 ArrayBlockingQueue 是個環形陣列 |
概括起來ArrayBlockingQueue 使用可重入鎖 ReentrantLock 控制佇列的插入和獲取,兩個 Condition 實現生產者 - 消費者模型。可以看出put和take方法主要是通過condition的通知機制來完成可阻塞式的插入資料和獲取資料。
#四、LinkedBlockingQueue ArrayBlockingQueue 是一個使用陣列實現的阻塞佇列,而LinkedBlockingQueue則是使用連結串列實現的有界阻塞佇列,當構造物件時為指定佇列大小時,佇列預設大小為Integer.MAX_VALU,可以通過過載構造方法設定最大值,佇列中的元素按 FIFO 的原則進行排序,吞吐量比ArrayBlockingQueue 要大。
package java.util.concurrent;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
/**
* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
* linked nodes.
* This queue orders elements FIFO (first-in-first-out).
* The <em>head</em> of the queue is that element that has been on the
* queue the longest time.
* The <em>tail</em> of the queue is that element that has been on the
* queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
* Linked queues typically have higher throughput than array-based queues but
* less predictable performance in most concurrent applications.
*
* <p>The optional capacity bound constructor argument serves as a
* way to prevent excessive queue expansion. The capacity, if unspecified,
* is equal to {@link Integer#MAX_VALUE}. Linked nodes are
* dynamically created upon each insertion unless this would bring the
* queue above capacity.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*/
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/**
* Linked list node class. 單向連結串列的節點類
*/
static class Node<E> {
E item; //當前節點
/**
* 當前節點的後置節點有三種情況::
* - the real successor Node 後置節點存在,代表真正的後置節點
* - this Node, meaning the successor is head.next 取值為當前節點,代表後置節點為head.next
* - null, meaning there is no successor (this is the last node) 取值為null,說明該節點就是尾節點
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none 佇列的容量,預設為 Integer.MAX_VALUE*/
private final int capacity;
/** Current number of elements 當前佇列中的元素數量*/
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null 頭節點的head.item為null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, 獲取元素的重入鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, 插入元素的重入鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.) 通過Condition 通知
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Links node at end of queue. 簡單的連結串列操作,在尾部新增節點node
* @param node the node
*/
private void enqueue(Node<E> node) {
last = last.next = node;
}
/**
* Removes a node from head of queue. 從連結串列頭部獲取元素
* @return the node
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;//
E x = first.item;
first.item = null;
return x;
}
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//初始化頭結點
}
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {//遍歷新增
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var 所有的put、take 操作都預設一個區域性變數用於表示操作是否成功
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);//建立節點
final ReentrantLock putLock = this.putLock;//初始化put重入鎖
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//申請put 重入鎖
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();//佇列已滿則阻塞
}
enqueue(node);//否則入隊,直接新增到連結串列尾部
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();//發出未滿訊號,喚醒其他阻塞的新增執行緒。
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;//為什麼每次都要去拿一遍?TODO
takeLock.lockInterruptibly();//申請take 重入鎖
try {
while (count.get() == 0) {
notEmpty.await();//阻塞消費者執行緒
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}
/**
* Unlinks interior Node p with predecessor trail.
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
/**
* Removes a single instanc