1. 程式人生 > >Java併發程式設計之CountDownLatch詳解

Java併發程式設計之CountDownLatch詳解

簡介

閉鎖是一種同步工具類,可以延遲執行緒的進度,直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能夠通過,當達到結束狀態時,這扇門會開啟並允許所有的執行緒通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠保持開啟狀態。閉鎖可以確保某些活動直到其他活動都完成之後才繼續執行。

CountDownLatch是一種靈活的閉鎖實現,它允許一個或多個執行緒等待一組事件的產生。閉鎖狀態包括一個計數器,該計數器初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生了,而await方法會一直阻塞直到計數器為0,或者等待中的執行緒中斷,或者等待超時。

CountDownLatch原始碼詳解

CountDownLatch類圖如下:

從類圖中可以看出,CountDownLatch內部依賴Sync實現,而Sync繼承自AQS。CountDownLatch僅提供了一個構造方法:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync是CountDownLatch的靜態內部類,其定義也比較簡單,如下所示:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        // 設定同步狀態值
        setState(count);
    }

    int getCount() {
        // 獲取同步狀態值
        return getState();
    }

    // 共享式獲取同步狀態
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 共享式釋放同步狀態
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

await()方法

CountDownLatch提供了await()方法來使當前執行緒一直等待,直到計數器的值減為0,或者執行緒被中斷,該方法定義如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await()方法呼叫了AQS的共享式相應中斷獲取同步狀態的方法,acquireSharedInterruptibly(int),如下所示:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Sync類重寫了tryAcquireShared(int)方法:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

可以看到,只有當計數器(即同步狀態)值為0時,才返回1,即當前執行緒獲取到了同步狀態,在這裡表示等待執行緒可以繼續執行,若計數器值不是0,則當前執行緒會呼叫doAcquireSharedInterruptibly(int)方法,一直自旋去嘗試獲取同步狀態:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()方法

CountDownLatch提供了countDown()方法遞減計數器的值,如果計數到達0,則釋放所有等待的執行緒,該方法定義如下:

public void countDown() {
    sync.releaseShared(1);
}

countDown()方法呼叫了AQS的releaseShared(int)方法來釋放共享鎖同步狀態:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

Sync類重寫了releaseShared(int)方法:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 獲取同步狀態
        int c = getState();
        // 同步狀態為0,則直接返回
        if (c == 0)
            return false;
        // 計算並更新同步狀態
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

應用程式示例

我們看一個CountDownLatch的應用示例:

public class CountDownLatchTest {
	// 自定義工作執行緒
	private static class Worker extends Thread {
		private CountDownLatch countDownLatch;
		
		public Worker(CountDownLatch countDownLatch) {
			this.countDownLatch = countDownLatch;
		}
		
		@Override
		public void run() {
			super.run();
			
			try {
				countDownLatch.await();
				System.out.println(Thread.currentThread().getName() + "開始執行");
				// 工作執行緒開始處理,這裡用Thread.sleep()來模擬業務處理
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + "執行完畢");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void main(String[] args) throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(1);
		
		for (int i = 0; i < 3; i++) {
			System.out.println("建立工作執行緒" + i);
			Worker worker = new Worker(countDownLatch);
			worker.start();
		}
		
		// 工作執行緒需要等待主執行緒準備操作完畢才可以執行,這裡用Thread.sleep()來模擬準備操作
		Thread.sleep(1000);
		System.out.println("主執行緒準備完畢");
		
		countDownLatch.countDown();
	}
}

執行結果(不唯一):

建立工作執行緒0
建立工作執行緒1
建立工作執行緒2
主執行緒準備完畢
Thread-0開始執行
Thread-2開始執行
Thread-1開始執行
Thread-0執行完畢
Thread-2執行完畢
Thread-1執行完畢

在上述程式碼中,我們自定義的工作執行緒必須要等主執行緒準備完畢才可以執行,我們可以使用CountDownLatch類來幫助我們完成。從程式的執行結果中也可以看出,3個工作執行緒確實是在主執行緒準備完畢後才開始執行。

相關部落格

參考資料

方騰飛:《Java併發程式設計的藝術》

Doug Lea:《Java併發程式設計實戰》