Java併發程式設計之CountDownLatch詳解
簡介
閉鎖是一種同步工具類,可以延遲執行緒的進度,直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能夠通過,當達到結束狀態時,這扇門會開啟並允許所有的執行緒通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠保持開啟狀態。閉鎖可以確保某些活動直到其他活動都完成之後才繼續執行。
CountDownLatch是一種靈活的閉鎖實現,它允許一個或多個執行緒等待一組事件的產生。閉鎖狀態包括一個計數器,該計數器初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生了,而await方法會一直阻塞直到計數器為0,或者等待中的執行緒中斷,或者等待超時。
CountDownLatch原始碼詳解
CountDownLatch類圖如下:
從類圖中可以看出,CountDownLatch內部依賴Sync實現,而Sync繼承自AQS。CountDownLatch僅提供了一個構造方法:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync是CountDownLatch的靜態內部類,其定義也比較簡單,如下所示:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 設定同步狀態值 setState(count); } int getCount() { // 獲取同步狀態值 return getState(); } // 共享式獲取同步狀態 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 共享式釋放同步狀態 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
await()方法
CountDownLatch提供了await()方法來使當前執行緒一直等待,直到計數器的值減為0,或者執行緒被中斷,該方法定義如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await()方法呼叫了AQS的共享式相應中斷獲取同步狀態的方法,acquireSharedInterruptibly(int),如下所示:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Sync類重寫了tryAcquireShared(int)方法:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
可以看到,只有當計數器(即同步狀態)值為0時,才返回1,即當前執行緒獲取到了同步狀態,在這裡表示等待執行緒可以繼續執行,若計數器值不是0,則當前執行緒會呼叫doAcquireSharedInterruptibly(int)方法,一直自旋去嘗試獲取同步狀態:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown()方法
CountDownLatch提供了countDown()方法遞減計數器的值,如果計數到達0,則釋放所有等待的執行緒,該方法定義如下:
public void countDown() {
sync.releaseShared(1);
}
countDown()方法呼叫了AQS的releaseShared(int)方法來釋放共享鎖同步狀態:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Sync類重寫了releaseShared(int)方法:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 獲取同步狀態
int c = getState();
// 同步狀態為0,則直接返回
if (c == 0)
return false;
// 計算並更新同步狀態
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
應用程式示例
我們看一個CountDownLatch的應用示例:
public class CountDownLatchTest {
// 自定義工作執行緒
private static class Worker extends Thread {
private CountDownLatch countDownLatch;
public Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
super.run();
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "開始執行");
// 工作執行緒開始處理,這裡用Thread.sleep()來模擬業務處理
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "執行完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 3; i++) {
System.out.println("建立工作執行緒" + i);
Worker worker = new Worker(countDownLatch);
worker.start();
}
// 工作執行緒需要等待主執行緒準備操作完畢才可以執行,這裡用Thread.sleep()來模擬準備操作
Thread.sleep(1000);
System.out.println("主執行緒準備完畢");
countDownLatch.countDown();
}
}
執行結果(不唯一):
建立工作執行緒0
建立工作執行緒1
建立工作執行緒2
主執行緒準備完畢
Thread-0開始執行
Thread-2開始執行
Thread-1開始執行
Thread-0執行完畢
Thread-2執行完畢
Thread-1執行完畢
在上述程式碼中,我們自定義的工作執行緒必須要等主執行緒準備完畢才可以執行,我們可以使用CountDownLatch類來幫助我們完成。從程式的執行結果中也可以看出,3個工作執行緒確實是在主執行緒準備完畢後才開始執行。
相關部落格
參考資料
方騰飛:《Java併發程式設計的藝術》
Doug Lea:《Java併發程式設計實戰》