Java併發基礎-併發工具類(二)
併發工具類
本系列文章主要講解 Java
併發相關的內容,包括同步、鎖、訊號量、阻塞佇列、執行緒池等,整體思維導圖如下:

系列文章列表:
- ofollow,noindex" target="_blank">Java併發基礎-併發模型、基礎介面以及Thread
- Java併發基礎-同步和鎖
- Java併發基礎-併發工具類(一)
本文主要以例項講解 Semaphore
、阻塞佇列等內容。
Semaphore
基本概念和用途
Semaphore
常稱訊號量,其維護了一個許可集,可以用來控制執行緒併發數。執行緒呼叫 acquire()
方法去或者許可證,然後執行相關任務,任務完成後,呼叫 release()
方法釋放該許可證,讓其他阻塞的執行緒可以執行。
Semaphore
可以用於流量控制,尤其是一些公共資源有限的場景,比如資料庫連線。假設我們上面的賬戶餘額管理中的賬戶修改操作涉及到去更改 mysql
資料庫,為了避免資料庫併發太大,我們進行相關限制。
常用方法
Semaphore(int permits)
:構造方法,初始化許可證數量
void acquire()
:獲取許可證
void release()
:釋放許可證
int availablePermits()
:返回此訊號量中當前可用的許可證數。
int getQueueLength()
:返回正在等待獲取許可證的執行緒數。
boolean hasQueuedThreads()
:是否有執行緒正在等待獲取許可證。
void reducePermits(int reduction)
:減少reduction個許可證。是個protected方法。
Collection getQueuedThreads()
:返回所有等待獲取許可證的執行緒集合。是個protected方法。
執行示例
雖然在程式碼中設定了 20
個執行緒去執行,但同時設定了許可證的數量為 5
,因而實際的最大併發數還是 5
。
package com.aidodoo.java.concurrent; import java.util.concurrent.*; /** * Created by zhangkh on 2018/9/9. */ public class SemaphoreDemo { public static void main(String[] args){ Semaphore semaphore=new Semaphore(5); ExecutorService executorService = Executors.newFixedThreadPool(20); Account account=new Account(); for(int i=0;i<20;i++){ SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore); executorService.submit(spender); } executorService.shutdown(); } } class SpenderWithSemaphore implements Runnable { private final Account account; private final Semaphore semaphore; public SpenderWithSemaphore(Account account, Semaphore semaphore) { this.account = account; this.semaphore = semaphore; } @Override public void run() { try{ semaphore.acquire(); System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000)); Thread.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); }finally { //System.out.println(String.format("%s release a premit",Thread.currentThread().getName())); semaphore.release(); } } }
獲取許可證後,模擬操作 mysql
,我們讓執行緒睡眠 2
秒,程式輸出如下:
pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql
可以看到前面 5
個執行緒同一時間 1536480858
獲得許可證,然後執行操作,並不是 20
個執行緒一起操作,這樣能降低對 mysql
資料庫的影響。
如果把上面 Semaphore
的構造方法中的許可證數量改為 20
,大家可以看到 20
個執行緒的執行時間基本一致。
原始碼實現
Semaphore
實現直接基於 AQS
,有公平和非公平兩種模式。公平模式即按照呼叫 acquire()
的順序依次獲得許可證,遵循 FIFO
(先進先出),非公平模式是搶佔式的,誰先搶到先使用。
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
獲取許可證
acquire()
方法最終呼叫父類 AQS
中的 acquireSharedInterruptibly
方法。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//(1) doAcquireSharedInterruptibly(arg);//(2) }
(1):呼叫 tryAcquireShared
,嘗試去獲取許可證
(2):如果獲取失敗,則呼叫 doAcquireSharedInterruptibly
,將執行緒加入到等待佇列中
tryAcquireShared
方法由 Semaphore
的內部類,同時也是 AQS
的子類去實現,即 NonfairSync
和 FairSync
,下面我們以 NonfairSync
為例說明其實現。
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
而 nonfairTryAcquireShared
方法如下:
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState();//(1) int remaining = available - acquires;//(2) if (remaining < 0 || compareAndSetState(available, remaining)) (3) return remaining; } }
(1):獲取 state
的值,也就是總許可證數量
(2):計算本次申請後,剩餘的許可證數量
(3):如果剩餘的許可證數量大於 0
且通過 CAS
將 state
的值修改成功後,返回剩餘的許可證數量,否則繼續迴圈阻塞。
釋放許可證
release()
方法的呼叫最終會呼叫父類 AQS
的 releaseShared()
方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//(1) doReleaseShared();//(2) return true; } return false; }
(1):嘗試釋放許可證
(2):如果釋放許可證成功,則通知阻塞的執行緒,讓其執行
tryReleaseShared
方法很簡單,基本上是 nonfairTryAcquireShared
的逆過程,即增加許可證的數量,並通過 CAS
修改 state
的值。
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
BlockingQueue
基本概念
阻塞佇列主要是解決如何高效安全傳輸資料的問題,此外能降低程式耦合度,讓程式碼邏輯更加清晰。
其繼承了 Queue
,並在其基礎上支援了兩個附加的操作:
- 當佇列為空時,獲取元素的執行緒會阻塞,等待佇列變為非空
- 當佇列滿時,新增元素的執行緒會阻塞,等待佇列可用
比較典型的使用場景是生產者和消費者。
BlockingQueue
根據對於不能立即滿足但可能在將來某一時刻可以滿足的操作,提供了不同的處理方法,進而導致眾多的 api
操作:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek()} | not applicable | not applicable |
Throws exception
:指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲 IllegalStateException
異常。當佇列為空時,從佇列裡獲取元素時會丟擲 NoSuchElementException
異常
Special value
:插入方法會返回是否成功,成功則返回
true
。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回
null
Blocks
:當阻塞佇列滿時,如果生產者執行緒往佇列裡 put
元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡 take
元素,佇列也會阻塞消費者執行緒,直到佇列可用。
Time out
:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。
整體架構和類圖
Java
併發包根據不同的結構和功能提供了不同的阻塞佇列,整體類圖如下:

其中 BlockingQueue
有如下子類:
ArrayBlockingQueue DelayQueue PriorityBlockingQueue SynchronousQueue LinkedBlockingQueue
其中 BlockingDeque
有一個子類:
-
LinkedBlockingDeque
:一個由連結串列結構組成的雙向阻塞佇列。BlockingDeque
作為雙端佇列,針對頭部元素,還提供瞭如下方法:
First Element (Head) | ||||
Throws exception | Special value | Blocks | Times out | |
Insert | addFirst(e) | offerFirst(e) | putFirst(e) | offerFirst(e, time, unit) |
Remove | removeFirst() | pollFirst() | takeFirst() | pollFirst(time, unit) |
Examine | getFirst() | peekFirst() | not applicable | not applicable |
針對尾部元素
Last Element (Tail) | ||||
Throws exception | Special value | Blocks | Times out | |
Insert | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
Remove | removeLast() | pollLast() | takeLast() | pollLast(time, unit) |
Examine | getLast() | peekLast() | not applicable | not applicable |
使用示例
一個典型的生產者和消費者例項如下,一個 BlockingQueue
可以安全地與多個生產者和消費者一起使用, Producer
執行緒呼叫 NumerGenerator
. getNextNumber()
生成自增整數,不斷地寫入數字,然後 Consumer
迴圈消費。
package com.aidodoo.java.concurrent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Created by zhangkh on 2018/7/17. */ public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue(1024,true); ExecutorService executorService = Executors.newFixedThreadPool(20); for (int i = 0; i < 5; i++) { executorService.submit(new Producer(queue)); } for (int i = 0; i < 3; i++) { executorService.submit(new Consumer(queue)); } Thread.sleep(30 * 1000L); executorService.shutdown(); } } class Producer implements Runnable { Logger logger = LoggerFactory.getLogger(Producer.class.getName()); protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for(int i=0;i<3;i++){ int num = NumerGenerator.getNextNumber(); queue.put(num); Thread.sleep(1000); logger.info("{} producer put {}", Thread.currentThread().getName(), num); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable { Logger logger = LoggerFactory.getLogger(Consumer.class.getName()); protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { int ele = (int) queue.take(); logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } class NumerGenerator{ private static AtomicInteger count = new AtomicInteger(); public static Integer getNextNumber(){ return count.incrementAndGet(); } }
程式輸出如下:
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2 18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5 18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5 18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9 18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8 18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15 18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11 18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14 18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15
其他 BlockingQueue
子類的使用可參考對應的 Java
Api
。
原始碼分析
由於 BlockingQueue
相關的子類眾多,我們僅以 ArrayBlockingQueue
從原始碼角度分析相關實現。
構造方法
ArrayBlockingQueue
中定義的成員變數如下:
final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null
各變數的解釋如下,以便了解後續的程式碼:
items takeIndex putIndex count notEmpty notFull itrs
內部結構如下:
構造方法如下:
public ArrayBlockingQueue(int capacity) { this(capacity, false);//(1) } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity];//(2) lock = new ReentrantLock(fair); notEmpty = lock.newCondition();//(3) notFull =lock.newCondition();//(4) } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) {//(5) checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
(1):預設情況下,非公平模式,即搶佔式
(2):陣列初始化
(3)/(4):條件變數初始化
(5):如果構造方法中,含有初始化集合的話,則將對應元素新增到內部陣列,並更改 count
和 putIndex
的值。
插入資料
插入資料,我們主要看 put()
方法的實現,重點看生產者和消費者插入和獲取資料時,執行緒何時阻塞,同時又何時喚醒。
public void put(E e) throws InterruptedException { checkNotNull(e);//(1) final ReentrantLock lock = this.lock;//(2) lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//(3) enqueue(e); } finally { lock.unlock();//(4) } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x;//(5) if (++putIndex == items.length)//(6) putIndex = 0; count++;//(7) notEmpty.signal();//(8) }
1
):非空檢查,插入的元素不能為
null
,否則丟擲
NullPointerException
( 2
):獲取互斥鎖
( 3
):如果當前佇列的元素個數等於佇列總長度,即佇列已滿,則通過條件變數,釋放和 notFull
相關的鎖,當前執行緒阻塞。當前執行緒喚醒的條件如下:
- 其他某個執行緒呼叫此
Condition
的signal()
方法,並且碰巧將當前執行緒選為被喚醒的執行緒; - 或者其他某個執行緒呼叫此
Condition
的signalAll()
方法; - 或者其他某個執行緒中斷當前執行緒,且支援中斷執行緒的掛起;
- 或者發生“虛假喚醒”
(5):如果佇列未滿,則將元素新增的 putIndex
索引的位置
putIndex
增加
1
後和佇列長度相等,即已到達佇列尾部,則
putIndex
置
0
(7):佇列已有元素數量加
1
(8):通知 notEmpty
條件變數,喚醒等待獲取元素的執行緒
(4):釋放互斥鎖
可以看到 ArrayBlockingQueue
每次插入元素後,都會去喚醒等待獲取元素的執行緒。
獲取資料
take()
方法原始碼如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock;//(1) lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//(2) return dequeue(); } finally { lock.unlock();//(9) } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex];//(3) items[takeIndex] = null;//(4) if (++takeIndex == items.length) takeIndex = 0;//(5) count--;//(6) if (itrs != null) itrs.elementDequeued();//(7) notFull.signal();//(8) return x; }
(1):獲取互斥鎖
(2):如果 count
為 0
,即佇列為空,則釋放互斥鎖,然後掛起當前執行緒
takeIndex
索引到陣列中獲取具體的值,並賦值給
x
(4):賦值完成後, takeIndex
索引位置資料置 null
,便於回收
takeIndex
加
1
,然後和佇列長度比較,如果相等,即已經讀取到佇列尾部,
takeIndex
置
0
(6):獲取後,將佇列元素個數
count
減
1
(7):維護和 queue
相關的迭代器
(8):喚醒等待插入元素的執行緒
(9):釋放互斥鎖
可以看到 ArrayBlockingQueue
每次獲取元素後,都會喚醒等待插入元素的執行緒。
迭代器
在分析原始碼前,我們先看在一個迭代器的示例
package com.aidodoo.java.concurrent; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Created by zhangkh on 2018/9/10. */ public class ArrayBlockingQueueIterDemo { public static void main(String[] args) throws InterruptedException{ BlockingQueue<String> queue=new ArrayBlockingQueue(5); queue.put("hadoop"); queue.put("spark"); queue.put("storm"); queue.put("flink"); Iterator<String> iterator1 = queue.iterator(); System.out.println( queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(); while(iterator1.hasNext()) { System.out.println(iterator1.next()); } System.out.println(); Iterator<String> iterator2 = queue.iterator(); while(iterator2.hasNext()) { System.out.println(iterator2.next()); } } }
程式輸出如下:
hadoop spark storm hadoop flink flink
我們結合這個示例來具體分析資料插入和獲取時,內部成員變數的值
當分別插入 hadoop
、 spark
、 storm
、 flink
四個元素後,內部變數的值如下:

此時, ArrayBlockingQueue
的成員變數的值 itrs
為 null
。
呼叫 iterator()
方法後,原始碼如下:
public Iterator<E> iterator() { return new Itr();//(1) } Itr() { lastRet = NONE; final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock();//(2) try { if (count == 0) {//(3) cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex);//(4) cursor = incCursor(takeIndex);//(5) if (itrs == null) { itrs = new Itrs(this);//(6) } else { itrs.register(this);//(7) itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; } } finally { lock.unlock();//(8) }
}
(1):呼叫內部類 Itr
的構造方法
(2):獲取外部類即 ArrayBlockingQueue
的鎖
(3):沒有沒有元素,初始化變數值。內部類 Itr
的成員變數如下:
/** Index to look for new nextItem; NONE at end */ private int cursor; /** Element to be returned by next call to next(); null if none */ private E nextItem; /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ private int nextIndex; /** Last element returned; null if none or not detached. */ private E lastItem; /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ private int lastRet; /** Previous value of takeIndex, or DETACHED when detached */ private int prevTakeIndex; /** Previous value of iters.cycles */ private int prevCycles;
(4):將外部類的 takeIndex
賦值給內部類 nextIndex
,並獲取陣列具體的值賦值給 nextItem
(5):計算遊標 cursor
的下個值,其中 incCursor
方法如下:
private int incCursor(int index) { // assert lock.getHoldCount() == 1; if (++index == items.length) index = 0; if (index == putIndex) index = NONE; return index; }
(6):註冊,主要是維護連結串列
(7):清理itrs
(8):釋放外部類的互斥鎖
在上面的示例中,呼叫 iterator()
方法後, Itr
的內部變數值如下:

由於後面三次呼叫了 queue
. take()
,依次輸出 hadoop
、 spark
、 storm
後,相關成員變數的值見圖片標識,重點關注 takeIndex
= 3
。
當呼叫 next()
方法時,程式碼如下:
public E next() { final E x = nextItem; if (x == null) throw new NoSuchElementException(); final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { if (!isDetached())//(1) incorporateDequeues(); lastRet = nextIndex; final int cursor = this.cursor; if (cursor >= 0) { nextItem = itemAt(nextIndex = cursor); this.cursor = incCursor(cursor); } else { nextIndex = NONE; nextItem = null; } } finally { lock.unlock(); } return x; }
其中(1)處的 isDetached
方法如下
boolean isDetached() { // assert lock.getHoldCount() == 1; return prevTakeIndex < 0; }
由於我們示例中初始化 Itr
的時候的 prevTakeIndex
為 0
,故 isDetached
返回為 false
,程式將呼叫 incorporateDequeues
方法,根據註釋我們也知道,該方法主要是調整和迭代器相關的內部索引。
/** * Adjusts indices to incorporate all dequeues since the last * operation on this iterator.Call only from iterating thread. */ private void incorporateDequeues() { final int cycles = itrs.cycles; final int takeIndex = ArrayBlockingQueue.this.takeIndex; final int prevCycles = this.prevCycles; final int prevTakeIndex = this.prevTakeIndex; if (cycles != prevCycles || takeIndex != prevTakeIndex) { final int len = items.length; // how far takeIndex has advanced since the previous // operation of this iterator long dequeues = (cycles - prevCycles) * len + (takeIndex - prevTakeIndex); // Check indices for invalidation if (invalidated(lastRet, prevTakeIndex, dequeues, len)) lastRet = REMOVED; if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) nextIndex = REMOVED; if (invalidated(cursor, prevTakeIndex, dequeues, len)) cursor = takeIndex; if (cursor < 0 && nextIndex < 0 && lastRet < 0) detach(); else { this.prevCycles = cycles; this.prevTakeIndex = takeIndex; } } }
注意 cursor
= takeIndex
這句程式碼,將外部內的 takeIndex
賦值給 cursor
,這樣子將佇列和迭代器資料讀取進行了同步。
對於 iterator1
,第一次呼叫 next()
方法時, cursor
被賦值為 3
首先將 nextItem
的值保持在 x
變數中,即 hadoop
字串。
然後設定 nextItem
和 cursor
的值
nextItem = itemAt(nextIndex = cursor); this.cursor = incCursor(cursor);
設定完成後, nextItem
為 flink
, cursor
為- 1
。
最後返回儲存在 x
變數中的值,即返回 hadoop
字串。
第二次呼叫 next()
方法時,輸出的值即上次儲存的 nextItem
值,即 flink
字串。
迭代器執行過程中,相關變數內容如下:

至於 iterator2
迭代器,各位可以自己去分析,不再贅述。
本文主要以例項講解 Semaphore
、阻塞佇列,並分析了相關核心原始碼實現。
本文參考
關於作者
愛程式設計、愛鑽研、愛分享、愛生活
關注分散式、高併發、資料探勘
如需捐贈,請掃碼
