Java多執行緒10 同步工具類CountDownLatch
前言
CountDownLatch是一個同步工具類,它允許一個或多個執行緒一直等待,直到其他執行緒執行完後再執行。例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有框架服務之後執行。
1 CountDownLatch主要方法:
void await():如果當前count大於0,當前執行緒將會wait,直到count等於0或者中斷。PS:當count等於0的時候,再去呼叫await(),
執行緒將不會阻塞,而是立即執行。後面可以通過原始碼分析得到。
boolean await(long timeout, TimeUnit unit):使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。
void countDown(): 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒。
long getCount() :獲得計數的數量
2 CountDownLatch使用例子
public class CountDownLatchTest { private static finalint N = 4; public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(4); for(int i=0;i<N;i++) { new Thread(){ public void run() { try { System.out.println("子執行緒"+Thread.currentThread().getName()+"正在執行"); Thread.sleep(3000); System.out.println("子執行緒"+Thread.currentThread().getName()+"執行完畢"); latch.countDown(); System.out.println("剩餘計數"+latch.getCount()); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); } try { System.out.println("等待"+N+"個子執行緒執行完畢..."); latch.await(); System.out.println(N+"個子執行緒已經執行完畢"); System.out.println("繼續執行主執行緒"); } catch (InterruptedException e) { e.printStackTrace(); } } }
子執行緒Thread-1正在執行 子執行緒Thread-3正在執行 子執行緒Thread-2正在執行 等待4個子執行緒執行完畢... 子執行緒Thread-0正在執行 子執行緒Thread-3執行完畢 子執行緒Thread-2執行完畢 剩餘計數2 子執行緒Thread-1執行完畢 剩餘計數1 子執行緒Thread-0執行完畢 剩餘計數3 剩餘計數0 4個子執行緒已經執行完畢 繼續執行主執行緒
3 CountDownLatch原始碼分析
CountDownLatch是通過計數器的方式來實現,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務之後,就會對計數器減1,當計數器的值為0時,表示所有執行緒完成了任務,此時等待在閉鎖上的執行緒才繼續執行,從而達到等待其他執行緒完成任務之後才繼續執行的目的。

image.png
建構函式
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
通過傳入一個數值來建立一個CountDownLatch,數值表示執行緒可以從等待狀態恢復,countDown方法必須被呼叫的次數
countDown方法
public void countDown() { sync.releaseShared(1); }
執行緒呼叫此方法對count進行減1。當count本來就為0,此方法不做任何操作,當count比0大,呼叫此方法進行減1,當new count為0,釋放所有等待當執行緒。
countDown方法的內部實現
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//釋放所有正在等待的執行緒節點 return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;// loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;// loop on failed CAS } if (h == head)// loop if head changed break; } }
await方法
(1)不帶引數
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
呼叫此方法時,當count為0,直接返回true,當count比0大,執行緒會一直等待,直到count的值變為0,或者執行緒被中斷(interepted,此時會丟擲中斷異常)。
(2)帶引數
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
呼叫此方法時,當count為0,直接返回true,當count比0大,執行緒會等待一段時間,等待時間內如果count的值變為0,返回true;當超出等待時間,返回false;或者等待時間內執行緒被中斷,此時會丟擲中斷異常。
await()方法的內部實現
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /* 具體如下: 1、檢測中斷標誌位 2、呼叫tryAcquireShared方法來檢查AQS標誌位state是否等於0,如果state等於0,則說明不需要等待,立即返回,否則進行3 3、呼叫doAcquireSharedInterruptibly方法進入AQS同步佇列進行等待,並不斷的自旋檢測是否需要喚醒 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /* 函式功能:根據AQS的狀態位state來返回值, 如果為state=0,返回 1 如果state=1,則返回-1 */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {//如果大於零,則說明需要喚醒 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
4 CountDownLatch和CyclicBarrier區別
- CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們側重點不同:
- CountDownLatch一般用於某個執行緒A等待若干個其他執行緒執行完任務之後,它才執行;
- CyclicBarrier一般用於一組執行緒互相等待至某個狀態,然後這一組執行緒再同時執行;
- CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。