Java並發(6)- CountDownLatch、Semaphore與AQS
引言
上一篇文章中詳細分析了基於AQS的ReentrantLock原理,ReentrantLock通過AQS中的state變量0和1之間的轉換代表了獨占鎖。那麽可以思考一下,當state變量大於1時代表了什麽?J.U.C中是否有基於AQS的這種實現呢?如果有,那他們都是怎麽實現的呢?這些疑問通過詳細分析J.U.C中的Semaphore與CountDownLatch類後,將會得到解答。
- Semaphore與CountDownLatch的共享邏輯
- Semaphore與CountDownLatch的使用示例
2.1 Semaphore的使用
2.2 CountDownLatch的使用 - 源碼分析
3.1 AQS中共享鎖的實現
3.3 CountDownLatch源碼分析 - 總結
1. Semaphore與CountDownLatch的共享方式
獨占鎖意味著只能有一個線程獲取鎖,其他的線程在鎖被占用的情況下都必須等待鎖釋放後才能進行下一步操作。由此類推,共享鎖是否意味著可以由多個線程同時使用這個鎖,不需要等待呢?如果是這樣,那鎖的意義也就不存在了。在J.U.C中共享意味著有多個線程可以同時獲取鎖,但這個多個是有限制的,並不是無限個,J.U.C中通過Semaphore與CountDownLatch來分別實現了兩種有限共享鎖。
Semaphore又叫信號量,他通過一個共享的’信號包‘來給每個使用他的線程來分配信號,當信號包中的信號足夠時,線程可以獲取鎖,反之,信號包中信號不夠了,則不能獲取到鎖,需要等待足夠的信號被釋放,才能獲取。
CountDownLatch又叫計數器,他通過一個共享的計數總量來控制線程鎖的獲取,當計數器總量大於0時,線程將被阻塞,不能夠獲取鎖,只有當計數器總量為0時,所有被阻塞的線程同時被釋放。
可以看到Semaphore與CountDownLatch都有一個共享總量,這個共享總量就是通過state來實現的。
2. Semaphore與CountDownLatch的使用示例
在詳細分析Semaphore與CountDownLatch的原理之前,先來看看他們是怎麽使用的,這樣方便後續我們理解他們的原理。先知道他是什麽?然後再問為什麽?下面通過兩個示例來詳細說明Semaphore與CountDownLatch的使用。
2.1 Semaphore的使用
//初始化10個信號量在信號包中,讓ABCD4個線程分別去獲取
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(10);
SemaphoreTest(semaphore);
}
private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException {
//線程A初始獲取了4個信號量,然後分3次釋放了這4個信號量
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(4);
System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
semaphore.release(1);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
semaphore.release(1);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
semaphore.release(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadA.setName("threadA");
//線程B初始獲取了5個信號量,然後分2次釋放了這5個信號量
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(5);
System.out.println(Thread.currentThread().getName() + " get 5 semaphore");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
semaphore.release(2);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " release 3 semaphore");
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadB.setName("threadB");
//線程C初始獲取了4個信號量,然後分1次釋放了這4個信號量
Thread threadC = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(4);
System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " release 4 semaphore");
semaphore.release(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadC.setName("threadC");
//線程D初始獲取了10個信號量,然後分1次釋放了這10個信號量
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(10);
System.out.println(Thread.currentThread().getName() + " get 10 semaphore");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " release 10 semaphore");
semaphore.release(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadD.setName("threadD");
//線程A和線程B首先分別獲取了4個和5個信號量,總信號量變為了1個
threadA.start();
threadB.start();
Thread.sleep(1);
//線程C嘗試獲取4個發現不夠則等待
threadC.start();
Thread.sleep(1);
//線程D嘗試獲取10個發現不夠則等待
threadD.start();
}
執行結果如下:
threadB get 5 semaphore
threadA get 4 semaphore
threadA release 1 semaphore
threadB release 2 semaphore
threadC get 4 semaphore
threadA release 1 semaphore
threadC release 4 semaphore
threadB release 3 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore
可以看到threadA和threadB在獲取了9個信號量之後threadC和threadD之後等待信號量足夠時才能繼續往下執行。而threadA和threadB在信號量足夠時是可以同時執行的。
其中有一個問題,當threadD排隊在threadC之前時,信號量如果被釋放了4個,threadC會先於threadD執行嗎?還是需要排隊等待呢?這個疑問在詳細分析了Semaphore的源碼之後再來給大家答案。
2.2 CountDownLatch的使用
//初始化計數器總量為2
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
CountDownLatchTest(countDownLatch);
}
private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException {
//threadA嘗試執行,計數器為2被阻塞
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " await");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadA.setName("threadA");
//threadB嘗試執行,計數器為2被阻塞
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " await");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadB.setName("threadB");
//threadC在1秒後將計數器數量減1
Thread threadC = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + " countDown");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadC.setName("threadC");
//threadD在5秒後將計數器數量減1
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + " countDown");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadD.setName("threadD");
threadA.start();
threadB.start();
threadC.start();
threadD.start();
}
執行結果如下:
threadC countDown
threadD countDown
threadA await
threadB await
threadA和threadB在嘗試執行時由於計數器總量為2被阻塞,當threadC和threadD將計數器總量減為0後,threadA和threadB同時開始執行。
總結一下:Semaphore就像旋轉壽司店,共有10個座位,當座位有空余時,等待的人就可以坐上去。如果有只有2個空位,來的是一家3口,那就只有等待。如果來的是一對情侶,就可以直接坐上去吃。當然如果同時空出5個空位,那一家3口和一對情侶可以同時上去吃。CountDownLatch就像大型商場裏面的臨時遊樂場,每一場遊樂的時間過後等待的人同時進場玩,而一場中間會有不愛玩了的人隨時出來,但不能進入,一旦所有進入的人都出來了,新一批人就可以同時進場。
3. 源碼分析
明白了Semaphore與CountDownLatch是做什麽的,怎麽使用的。接下來就來看看Semaphore與CountDownLatch底層時怎麽實現這些功能的。
3.1 AQS中共享鎖的實現
上篇文章通過對ReentrantLock的分析,得倒了AQS中實現獨占鎖的幾個關鍵方法:
//狀態量,獨占鎖在0和1之間切換
private volatile int state;
//調用tryAcquire獲取鎖,獲取失敗後加入隊列中掛起等操作,AQS中實現
public final void acquire(int arg);
//獨占模式下嘗試獲取鎖,ReentrantLock中實現
protected boolean tryAcquire(int arg);
//調用tryRelease釋放鎖以及恢復線程等操作,AQS中實現
public final boolean release(int arg);
//獨占模式下嘗試釋放鎖,ReentrantLock中實現
protected boolean tryRelease(int arg);
其中具體的獲取和釋放獨占鎖的邏輯都放在ReentrantLock中自己實現,AQS中負責管理獲取或釋放獨占鎖成功失敗後需要具體處理的邏輯。那麽共享鎖的實現是否也是遵循這個規律呢?由此我們在AQS中發現了以下幾個類似的方法:
//調用tryAcquireShared獲取鎖,獲取失敗後加入隊列中掛起等操作,AQS中實現
public final void acquireShared(int arg);
//共享模式下嘗試獲取鎖
protected int tryAcquireShared(int arg);
//調用tryReleaseShared釋放鎖以及恢復線程等操作,AQS中實現
public final boolean releaseShared(int arg);
//共享模式下嘗試釋放鎖
protected boolean tryReleaseShared(int arg);
共享鎖和核心就在上面4個關鍵方法中,先來看看Semaphore是怎麽調用上述方法來實現共享鎖的。
3.2 Semaphore源碼分析
首先是Semaphore的構造方法,同ReentrantLock一樣,他有兩個構造方法,這樣也是為了實現公平共享鎖和非公平共享鎖,大家可能有疑問,既然是共享鎖,為什麽還分公平和非公平的呢?這就回到了上面那個例子後面的疑問,前面有等待的線程時,後來的線程是否可以直接獲取信號量,還是一定要排隊。等待當然是公平的,插隊就是非公平的。
還是用旋轉壽司的例子來說:現在只有2個空位,已經有一家3口在等待,這時來了一對情侶,公平共享鎖的實現就是這對情侶必須等待,只到一家3口上桌之後才輪到他們,而非公平共享鎖的實現是可以讓這對情況直接去吃,因為剛好有2個空位,讓一家3口繼續等待(好像是很不公平......),這種情況下非公平共享鎖的好處就是可以最大化壽司店的利潤(好像同時也得罪了等待的顧客......),也是Semaphore默認的實現方式。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore的例子中使用了兩個核心方法acquire和release,分別調用了AQS中的acquireSharedInterruptibly和releaseShared方法:
//獲取permits個信號量
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//釋放permits個信號量
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //嘗試獲取arg個信號量
doAcquireSharedInterruptibly(arg); //獲取信號量失敗時排隊掛起
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //嘗試釋放arg個信號量
doReleaseShared();
return true;
}
return false;
}
Semaphore在獲取和釋放信號量的流程都是通過AQS來實現,具體怎麽算獲取成功或釋放成功則由Semaphore本身實現。
//公平共享鎖嘗試獲取acquires個信號量
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) //前面是否有排隊,有則返回獲取失敗
return -1;
int available = getState(); //剩余的信號量(旋轉壽司店剩余的座位)
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 剩余信號量不夠,夠的情況下嘗試獲取(旋轉壽司店座位不夠,或者同時來兩對情況搶座位)
return remaining;
}
}
//非公平共享鎖嘗試獲取acquires個信號量
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //剩余的信號量(旋轉壽司店剩余的座位)
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 剩余信號量不夠,夠的情況下嘗試獲取(旋轉壽司店座位不夠,或者同時來兩對情侶搶座位)
return remaining;
}
}
可以看到公平共享鎖和非公平共享鎖的區別就在是否需要判斷隊列中是否有已經等待的線程。公平共享鎖需要先判斷,非公平共享鎖直接插隊,盡管前面已經有線程在等待。
為了驗證這個結論,稍微修改下上面的示例:
threadA.start();
threadB.start();
Thread.sleep(1);
threadD.start(); //threadD已經在排隊
Thread.sleep(3500);
threadC.start(); //3500毫秒後threadC來插隊
結果輸出:
threadB get 5 semaphore
threadA get 4 semaphore
threadB release 2 semaphore
threadA release 1 semaphore
threadC get 4 semaphore //threadC先與threadD獲取到信號量
threadA release 1 semaphore
threadB release 3 semaphore
threadC release 4 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore
這個示例很好的說明了當為非公平鎖時會先嘗試獲取共享鎖,然後才排隊。
當獲取信號量失敗之後會去排隊,排隊這個操作通過AQS中的doAcquireSharedInterruptibly方法來實現:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //加入等待隊列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //獲取當前節點的前置節點
if (p == head) {
int r = tryAcquireShared(arg); //前置節點是頭節點時,說明當前節點是第一個掛起的線程節點,再次嘗試獲取共享鎖
if (r >= 0) {
setHeadAndPropagate(node, r); //與ReentrantLock不同的地方:獲取共享鎖成功設置頭節點,同時通知下一個節點
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && //非頭節點或者獲取鎖失敗,檢查節點狀態,查看是否需要掛起線程
parkAndCheckInterrupt()) //掛起線程,當前線程阻塞在這裏!
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這一段代碼和ReentrantLock中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法基本一樣,說下兩個不同的地方。一是加入等待隊列時這裏加入的是Node.SHARED類型的節點。二是獲取鎖成功後會通知下一個節點,也就是喚醒下一個線程。以旋轉壽司店的例子為例,前面同時走了5個客人,空余5個座位,一家3口坐進去之後會告訴後面的一對情侶,讓他們也坐進去,這樣就達到了共享的目的。shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法在上一篇文章中都有詳細說明,這裏就做解釋了。
再來看看releaseShared方法時怎麽釋放信號量的,首先調用tryReleaseShared來嘗試釋放信號量,釋放成功後調用doReleaseShared來判斷是否需要喚醒後繼線程:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow //釋放信號量過多
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //cas操作設置新的信號量
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //SIGNAL狀態下喚醒後繼節點
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //喚醒後繼節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
釋放的邏輯很好理解,相比ReentrantLock只是在state的數量上有點差別。
3.3 CountDownLatch源碼分析
CountDownLatch相比Semaphore在實現邏輯上要簡單的多,同時他也沒有公平和非公平的區別,因為當計數器達到0的時候,所有等待的線程都會釋放,不為0的時候,所有等待的線程都會阻塞。直接來看看CountDownLatch的兩個核心方法await和countDown。
public void await() throws InterruptedException {
//和Semaphore的不同在於參數為1,其實這個參數對CountDownLatch來說沒什麽意義,因為後面CountDownLatch的tryAcquireShared實現是通過getState() == 0來判斷的
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
//這裏加入了一個等待超時控制,超過時間後直接返回false執行後面的代碼,不會長時間阻塞
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1); //每次釋放1個計數
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //嘗試獲取arg個信號量
doAcquireSharedInterruptibly(arg); //獲取信號量失敗時排隊掛起
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; //奠定了同時獲取鎖的基礎,無論State初始為多少,只能計數等於0時觸發
}
和Semaphore區別有兩個,一是State每次只減少1,同時只有為0時才釋放所有等待線程。二是提供了一個超時等待方法。acquireSharedInterruptibly方法跟Semaphore一樣,就不細說了,這裏重點說下tryAcquireSharedNanos方法。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//最小自旋時間
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; //計算了一個deadline
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) //超時後直接返回false,繼續執行
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) //大於最小cas操作時間則掛起線程
LockSupport.parkNanos(this, nanosTimeout); //掛起線程也有超時限制
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
重點看標了註釋的幾行代碼,首先計算了一個超時時間,當超時後直接退出等待,繼續執行。如果未超時並且大於最小的cas操作時間,這裏定義的是1000ns,則掛起,同時掛起操作也有超時限制。這樣就實現了超時等待。
4.總結
至此關於AQS的共享鎖的兩個實現Semaphore與CountDownLatch就分析完了,他們與非共享最大的區別就在於是否能多個線程同時獲取鎖。看完後希望大家能對Semaphore與CountDownLatch有深刻的理解,不明白的時候想想旋轉壽司店和遊樂場的例子,如果對大家有幫助,覺得寫的好的話,可以點個贊,當然更希望大家能積極指出文中的錯誤和提出積極的改進意見。
Java並發(6)- CountDownLatch、Semaphore與AQS