java並發之阻塞隊列LinkedBlockingQueue與ArrayBlockingQueue
Java中阻塞隊列接口BlockingQueue繼承自Queue接口,並提供put、take阻塞方法。兩個主要的阻塞類實現是ArrayBlockingQueue和LinkedBlockingQueue。阻塞隊列的主要方法
public interface BlockingQueue<E> extends Queue<E> { //將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量) //在成功時返回 true,如果此隊列已滿,則拋IllegalStateException。 boolean add(E e);//將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量) // 將指定的元素插入此隊列的尾部,如果該隊列已滿, //則在到達指定的等待時間之前等待可用的空間,該方法可中斷 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //將指定的元素插入此隊列的尾部,如果該隊列已滿,則一直等到(阻塞)。 void put(E e) throws InterruptedException; //獲取並移除此隊列的頭部,如果沒有元素則等待(阻塞),//直到有元素將喚醒等待線程執行該操作 E take() throws InterruptedException; //獲取並移除此隊列的頭部,在指定的等待時間前一直等到獲取元素, //超過時間方法將結束 E poll(long timeout, TimeUnit unit) throws InterruptedException; //從此隊列中移除指定元素的單個實例(如果存在)。 boolean remove(Object o); } //除了上述方法還有繼承自Queue接口的方法 //獲取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementExceptionE element(); //獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 E peek(); //獲取並移除此隊列的頭,如果此隊列為空,則返回 null。 E poll();
一、ArrayBlockQueue的原理與實現
ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為添加和刪除的阻塞方法。其內部是通過重入鎖ReenterLock和Condition條件隊列實現的,用數組存儲所有的數據,用一個ReentrantLock來同時控制添加線程和移除線程的並發訪問,一個notEmpty條件對象存放等待或喚醒調用take方法的線程,用notFull條件對象存放或喚醒調用put方法的線程。takeIndex代表的是下一個方法(take,poll,peek,remove)被調用時獲取數組元素的索引,putIndex則代表下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。
每次添加元素時添加到隊尾,獲取元素時從隊頭獲取。當putIndex索引等於數組長度時,要將putIndex重新設置為0,繼續從數組頭開始添加元素。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 存儲數據的數組 */ final Object[] items; /**獲取數據的索引,主要用於take,poll,peek,remove方法 */ int takeIndex; /**添加數據的索引,主要用於 put, offer, or add 方法*/ int putIndex; /** 隊列元素的個數 */ int count; /** 控制並非訪問的鎖 */ final ReentrantLock lock; /**notEmpty條件對象,用於通知take方法隊列已有元素,可執行獲取操作 */ private final Condition notEmpty; /**notFull條件對象,用於通知put方法隊列未滿,可執行添加操作 */ private final Condition notFull; /** 叠代器 */ transient Itrs itrs = null; //構造方法源碼 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //add方法實現,間接調用了offer(e) public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } //offer方法 public boolean offer(E e) { checkNotNull(e);//檢查元素是否為null final ReentrantLock lock = this.lock; lock.lock();//加鎖 try { if (count == items.length)//判斷隊列是否滿 return false; else { enqueue(e);//添加元素到隊列 return true; } } finally { lock.unlock(); } } //入隊操作 private void enqueue(E x) { //獲取當前數組 final Object[] items = this.items; //通過putIndex索引對數組進行賦值 items[putIndex] = x; //索引自增,如果已是最後一個位置,重新設置 putIndex = 0; if (++putIndex == items.length) putIndex = 0; count++;//隊列中元素數量加1 //喚醒調用take()方法的線程,執行元素獲取操作。 notEmpty.signal(); } //put方法,阻塞時可中斷 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//該方法可中斷 try { //當隊列元素個數與數組長度相等時,無法添加元素 while (count == items.length) //將當前調用線程掛起,添加到notFull條件隊列中等待喚醒 notFull.await(); enqueue(e);//如果隊列沒有滿直接添加。。 } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //判斷隊列是否為null,不為null執行dequeue()方法,否則返回null return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //刪除隊列頭元素並返回 private E dequeue() { //拿到當前數組的數據 final Object[] items = this.items; @SuppressWarnings("unchecked") //獲取要刪除的對象 E x = (E) items[takeIndex]; 將數組中takeIndex索引位置設置為null items[takeIndex] = null; //takeIndex索引加1並判斷是否與數組長度相等, //如果相等說明已到盡頭,恢復為0 if (++takeIndex == items.length) takeIndex = 0; count--;//隊列個數減1 if (itrs != null) itrs.elementDequeued();//同時更新叠代器中的元素數據 //刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操作 notFull.signal(); return x; } public boolean remove(Object o) { if (o == null) return false; //獲取數組數據 final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock();//加鎖 try { //如果此時隊列不為null,這裏是為了防止並發情況 if (count > 0) { //獲取下一個要添加元素時的索引 final int putIndex = this.putIndex; //獲取當前要被刪除元素的索引 int i = takeIndex; //執行循環查找要刪除的元素 do { //找到要刪除的元素 if (o.equals(items[i])) { removeAt(i);//執行刪除 return true;//刪除成功返回true } //當前刪除索引執行加1後判斷是否與數組長度相等 //若為true,說明索引已到數組盡頭,將i設置為0 if (++i == items.length) i = 0; } while (i != putIndex);//繼承查找 } return false; } finally { lock.unlock(); } } //根據索引刪除元素,實際上是把刪除索引之後的元素往前移動一個位置 void removeAt(final int removeIndex) { final Object[] items = this.items; //先判斷要刪除的元素是否為當前隊列頭元素 if (removeIndex == takeIndex) { //如果是直接刪除 items[takeIndex] = null; //當前隊列頭元素加1並判斷是否與數組長度相等,若為true設置為0 if (++takeIndex == items.length) takeIndex = 0; count--;//隊列元素減1 if (itrs != null) itrs.elementDequeued();//更新叠代器中的數據 } else { //如果要刪除的元素不在隊列頭部, //那麽只需循環叠代把刪除元素後面的所有元素往前移動一個位置 //獲取下一個要被添加的元素的索引,作為循環判斷結束條件 final int putIndex = this.putIndex; //執行循環 for (int i = removeIndex;;) { //獲取要刪除節點索引的下一個索引 int next = i + 1; //判斷是否已為數組長度,如果是從數組頭部(索引為0)開始找 if (next == items.length) next = 0; //如果查找的索引不等於要添加元素的索引,說明元素可以再移動 if (next != putIndex) { items[i] = items[next];//把後一個元素前移覆蓋要刪除的元 i = next; } else { //在removeIndex索引之後的元素都往前移動完畢後清空最後一個元素 items[i] = null; this.putIndex = i; break;//結束循環 } } count--;//隊列元素減1 if (itrs != null) itrs.removedAt(removeIndex);//更新叠代器數據 } notFull.signal();//喚醒添加線程 } //從隊列頭部刪除,隊列沒有元素就阻塞,可中斷 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//中斷 try { //如果隊列沒有元素 while (count == 0) //執行阻塞操作 notEmpty.await(); return dequeue();//如果隊列有元素執行刪除操作 } finally { lock.unlock(); } } }
二、LinkedBlockingQueue原理與實現
LinkedBlockingQueue是一個由鏈表實現的有界隊列阻塞隊列,但大小默認值為Integer.MAX_VALUE,所以我們在使用LinkedBlockingQueue時建議手動傳值,為其提供我們所需的大小,避免隊列過大造成機器負載或者內存爆滿等情況。
一般鏈表隊列吞吐量要高於基於數組的阻塞隊列,因為其內部實現添加和刪除使用兩個ReetrantLock來控制並發執行。雖然鏈表隊列和數組隊列的API幾乎一樣,但其內部實現原理不同。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對並發進行控制,也就是說,添加和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。這裏再次強調如果沒有給LinkedBlockingQueue指定容量大小,其默認值將是Integer.MAX_VALUE,如果存在添加速度大於刪除速度時候,有可能會內存溢出。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 節點類,用於存儲數據 */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } /** 阻塞隊列的大小,默認為Integer.MAX_VALUE */ private final int capacity; /** 當前阻塞隊列中的元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 阻塞隊列的頭結點 */ transient Node<E> head; /** * 阻塞隊列的尾節點 */ private transient Node<E> last; /** 獲取並移除元素時使用的鎖,如take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 */ private final Condition notEmpty = takeLock.newCondition(); /** 添加元素時使用的鎖如 put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 */ private final Condition notFull = putLock.newCondition(); public boolean offer(E e) { //添加元素為null直接拋出異常 if (e == null) throw new NullPointerException(); //獲取隊列的個數 final AtomicInteger count = this.count; //判斷隊列是否已滿 if (count.get() == capacity) return false; int c = -1; //構建節點 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //再次判斷隊列是否已滿,考慮並發情況 if (count.get() < capacity) { enqueue(node);//添加元素 c = count.getAndIncrement();//拿到當前未添加新元素時的隊列長度 //如果容量還沒滿 if (c + 1 < capacity) notFull.signal();//喚醒下一個添加線程,執行添加操作 } } finally { putLock.unlock(); } // 由於存在添加鎖和消費鎖,而消費鎖和添加鎖都會持續喚醒等到線程,因此count肯定會變化。 //這裏的if條件表示如果隊列中還有1條數據 if (c == 0) signalNotEmpty();//如果還存在數據那麽就喚醒消費鎖 return c >= 0; // 添加成功返回true,否則返回false } //入隊操作 private void enqueue(Node<E> node) { //隊列尾節點指向新的node節點 last = last.next = node; } //signalNotEmpty方法 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //喚醒獲取並刪除元素的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } public boolean remove(Object o) { if (o == null) return false; fullyLock();//同時對putLock和takeLock加鎖 try { //循環查找要刪除的元素 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) {//找到要刪除的節點 unlink(p, trail);//直接刪除 return true; } } return false; } finally { fullyUnlock();//解鎖 } } //兩個同時加鎖 void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } public E poll() { //獲取當前隊列的大小 final AtomicInteger count = this.count; if (count.get() == 0)//如果沒有元素直接返回null return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //判斷隊列是否有數據 if (count.get() > 0) { //如果有,直接刪除並獲取該元素值 x = dequeue(); //當前隊列大小減一 c = count.getAndDecrement(); //如果隊列未空,繼續喚醒等待在條件對象notEmpty上的消費線程 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } //判斷c是否等於capacity,這是因為如果滿說明NotFull條件對象上 //可能存在等待的添加線程 if (c == capacity) signalNotFull(); return x; } private E dequeue() { Node<E> h = head;//獲取頭結點 Node<E> first = h.next; 獲取頭結的下一個節點(要刪除的節點) h.next = h; // help GC//自己next指向自己,即被刪除 head = first;//更新頭結點 E x = first.item;//獲取刪除節點的值 first.item = null;//清空數據,因為first變成頭結點是不能帶數據的,這樣也就刪除隊列的帶數據的第一個節點 return x; } public E take() throws InterruptedException { E x; int c = -1; //獲取當前隊列大小 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//可中斷 try { //如果隊列沒有數據,掛機當前線程到條件對象的等待隊列中 while (count.get() == 0) { notEmpty.await(); } //如果存在數據直接刪除並返回該數據 x = dequeue(); c = count.getAndDecrement();//隊列大小減1 if (c > 1) notEmpty.signal();//還有數據就喚醒後續的消費線程 } finally { takeLock.unlock(); } //滿足條件,喚醒條件對象上等待隊列中的添加線程 if (c == capacity) signalNotFull(); return x; } }
三、數組阻塞隊列和鏈表阻塞隊列的區別
1.隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對於後者而言,當添加速度大於移除速度時,在無界的情況下,可能會造成內存溢出等問題。
2.數據存儲容器不同,ArrayBlockingQueue采用的是數組作為數據存儲容器,而LinkedBlockingQueue采用的則是以Node節點作為連接對象的鏈表。
3.由於ArrayBlockingQueue采用的是數組的存儲容器,因此在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內需要高效並發地處理大批量數據的時,對於GC可能存在較大影響。
4.兩者的實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味著在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
並發之阻塞隊列LinkedBlockingQueue與ArrayBlockingQueue
java並發之阻塞隊列LinkedBlockingQueue與ArrayBlockingQueue