1. 程式人生 > >【搞定Java併發程式設計】第24篇:Java中的併發工具類之CountDownLatch

【搞定Java併發程式設計】第24篇:Java中的併發工具類之CountDownLatch

上一篇:Java中的阻塞佇列 BlockingQueue 詳解

本文目錄

1、CountDownLatch的基本概述

2、CountDownLatch的使用案例

3、CountDownLatch的原始碼分析


1、CountDownLatch的基本概述

CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。

CountDownLatch又稱為“閉鎖”,它是一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待,直到等待的執行緒操作完成或者等待超時,自己再開始執行。

CountDownLatch可以延遲執行緒的進度直到其他執行緒到達終止狀態,它可以用來確保某些任務在其他任務都完成後再繼續進行:

  • 確保某個計算在其需要的所有資源都被初始化之後再繼續執行;
  • 確保某個服務在其依賴的所有其他服務都已經啟動之後才啟動;
  • 等待直到某個操作所有參與者都準備就緒後再繼續執行。

另外它還提供了一個countDown方法來操作計數器的值,每呼叫一次countDown方法計數器都會減1,直到計數器的值減為0時就代表條件已成熟,所有因呼叫await方法而阻塞的執行緒都會被喚醒。

這就是CountDownLatch的內部機制,看起來很簡單,無非就是阻塞一部分執行緒讓其在達到某個條件之後再執行。但是CountDownLatch的應用場景卻比較廣泛,只要你腦洞夠大利用它就可以玩出各種花樣。最常見的一個應用場景是開啟多個執行緒同時執行某個任務,等到所有任務都執行完再統計彙總結果。下圖動態演示了閉鎖阻塞執行緒的整個過程。

上圖演示了有5個執行緒因呼叫await方法而被阻塞,它們需要等待計數器的值減為0才能繼續執行。計數器的初始值在構造閉鎖時被指定,後面隨著每次countDown方法的呼叫而減1。


2、CountDownLatch的使用案例

【案例1】

我們看下 Doug Lea 在 java doc 中給出的例子,這個例子非常實用,我們經常會寫這個程式碼。

假設我們有 N ( N > 0 ) 個任務,那麼我們會用 N 來初始化一個 CountDownLatch,然後將這個 latch 的引用傳遞到各個執行緒中,在每個執行緒完成了任務後,呼叫 latch.countDown() 代表完成了一個任務。呼叫 latch.await() 的方法的執行緒會阻塞,直到所有的任務完成。

package com.zju.CountDownLatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Driver {

	public static void main(String[] args) throws InterruptedException {
		
		CountDownLatch doneSignal = new CountDownLatch(10);
		Executor e = Executors.newFixedThreadPool(4);
		
		// 建立10個任務,提交給執行緒池來執行
		for (int i = 0; i < 10; i++) {
			e.execute(new WorkerRunnable(doneSignal, i));
		}
		
		// 等待所有的任務都完成了,這個方法才會返回
		doneSignal.await();
	}
}

class WorkerRunnable implements Runnable{

	private final CountDownLatch doneSignal;
	private final int i;
	
	WorkerRunnable(CountDownLatch doneSignal, int i){
		this.doneSignal = doneSignal;
		this.i = i;
	}
	
	@Override
	public void run() {
		try {
			doWork(i);
			// 這個執行緒的任務完成了,呼叫 countDown 方法
			doneSignal.countDown();
		} catch (Exception e) {
		}
	}
	
	public void doWork(int i){
		System.out.println(i);
	}
}

所以說 CountDownLatch 非常實用,我們常常會將一個比較大的任務進行拆分,然後開啟多個執行緒來執行,等所有執行緒都執行完了以後,再往下執行其他操作。這裡例子中,只有 main 執行緒呼叫了 await 方法

我們再來看另一個例子,這個例子很典型,用了兩個 CountDownLatch:

class Driver { 
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);
 
        for (int i = 0; i < N; ++i) 
            new Thread(new Worker(startSignal, doneSignal)).start();
 
        // 這邊插入一些程式碼,確保上面的每個執行緒先啟動起來,才執行下面的程式碼。
        doSomethingElse();            // don't let run yet
        // 因為這裡 N == 1,所以,只要呼叫一次,那麼所有的 await 方法都可以通過
        startSignal.countDown();      
        doSomethingElse();
        // 等待所有任務結束
        doneSignal.await();           
    }
}
 
class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
 
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
 
    public void run() {
        try {
            // 為了讓所有執行緒同時開始任務,我們讓所有執行緒先阻塞在這裡
            // 等大家都準備好了,再開啟這個門栓
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } 
    }
 
    void doWork() { ...}
}

這個例子中,doneSignal 同第一個例子的使用,我們說說這裡的 startSignal。N 個新開啟的執行緒都呼叫了startSignal.await() 進行阻塞等待,它們阻塞在柵欄上,只有當條件滿足的時候(startSignal.countDown()),它們才能同時通過這個柵欄。

5

如果始終只有一個執行緒呼叫 await 方法等待任務完成,那麼 CountDownLatch 就會簡單很多,所以之後的原始碼分析讀者一定要在腦海中構建出這麼一個場景:有 m 個執行緒是做任務的,有 n 個執行緒在某個柵欄上等待這 m 個執行緒做完任務,直到所有 m 個任務完成後,n 個執行緒同時通過柵欄。

【案例2】

應用場景:在玩歡樂鬥地主時必須等待三個玩家都到齊才可以進行發牌。

package com.zju.CountDownLatch;

import java.util.concurrent.CountDownLatch;

public class Player extends Thread {

	private static int count = 1;
	private final int id = count++;
	private CountDownLatch latch;
	
	public Player(CountDownLatch latch){
		this.latch = latch;
	}
	
	@Override
	public void run() {
		System.out.println("玩家" + id + "已入場");
		latch.countDown();
	}

	public static void main(String[] args) throws InterruptedException {
		CountDownLatch latch = new CountDownLatch(3);
		System.out.println("牌局開始,等待玩家入場...");
		
		new Player(latch).start();
		new Player(latch).start();
		new Player(latch).start();
		
		latch.await();
		System.out.println("玩家已到齊,開始發牌!");
	}
}

執行結果:

執行結果顯示發牌操作一定是在所有玩家都入場後才進行。我們將23行的latch.await()註釋掉,對比下看看結果:

可以看到在註釋掉latch.await()這行之後,就不能保證在所有玩家入場後才開始發牌了。


3、CountDownLatch的原始碼分析

因為CountDownLatch的原始碼比較少,這裡直接全部貼出來,方便直觀感受下CountDownLatch的內部結構。可以發現CountDownLactch是基於AQS共享式鎖基礎上實現的。

public class CountDownLatch {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {    
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
	
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

首先來看下CountDownLatch的構造方法:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

CountDownLatch只有一個帶參構造器,必須傳入一個大於0的值作為計數器初始值,否則會報錯。可以看到在構造方法中只是去new了一個Sync物件並賦值給成員變數sync。和其他同步工具類一樣,CountDownLatch的實現依賴於AQS,它是AQS共享模式下的一個應用。CountDownLatch實現了一個內部類Sync並用它去繼承AQS,這樣就能使用AQS提供的大部分方法了。下面我們就來看一下Sync內部類的程式碼。

//同步器
private static final class Sync extends AbstractQueuedSynchronizer {

    // 構造器
    Sync(int count) {
        setState(count);
    }

    // 獲取當前同步狀態
    int getCount() {
        return getState();
    }

    // 嘗試獲取鎖
    // 返回負數:表示當前執行緒獲取失敗
    // 返回零值:表示當前執行緒獲取成功, 但是後繼執行緒不能再獲取了
    // 返回正數:表示當前執行緒獲取成功, 並且後繼執行緒同樣可以獲取成功
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 嘗試釋放鎖
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            // 獲取同步狀態
            int c = getState();
            // 如果同步狀態為0, 則不能再釋放了
            if (c == 0) {
                return false;
            }
            // 否則的話就將同步狀態減1
            int nextc = c-1;
            // 使用CAS方式更新同步狀態
            if (compareAndSetState(c, nextc)) {
                return nextc == 0;
            }
        }
    }
}

可以看到Sync的構造方法會將同步狀態的值設定為傳入的引數值。之後每次呼叫countDown方法都會將同步狀態的值減1,這也就是計數器的實現原理。在平時使用CountDownLatch工具類時最常用的兩個方法就是await方法和countDown方法。呼叫await方法會阻塞當前執行緒直到計數器為0,呼叫countDown方法會將計數器的值減1直到減為0。下面我們來看一下await方法是怎樣呼叫的。

// 導致當前執行緒等待, 直到門閂減少到0, 或者執行緒被打斷
public void await() throws InterruptedException {
    // 以響應執行緒中斷方式獲取
    sync.acquireSharedInterruptibly(1);
}

// 以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 首先判斷執行緒是否中斷, 如果是則丟擲異常
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 1.嘗試去獲取鎖
    if (tryAcquireShared(arg) < 0) {
        // 2. 如果獲取失敗則進人該方法
        doAcquireSharedInterruptibly(arg);
    }
}

當執行緒呼叫await方法時其實是呼叫到了AQS的acquireSharedInterruptibly方法,該方法是以響應執行緒中斷的方式來獲取鎖的,上面同樣貼出了該方法的程式碼。我們可以看到在acquireSharedInterruptibly方法首先會去呼叫tryAcquireShared方法嘗試獲取鎖。

我們看到Sync裡面重寫的tryAcquireShared方法的邏輯,方法的實現邏輯很簡單,就是判斷當前同步狀態是否為0,如果為0則返回1表明可以獲取鎖,否則返回-1表示不能獲取鎖。如果tryAcquireShared方法返回1則執行緒能夠不必等待而繼續執行,如果返回-1那麼後續就會去呼叫doAcquireSharedInterruptibly方法讓執行緒進入到同步佇列裡面等待。這就是呼叫await方法會阻塞當前執行緒的原理,下面看看countDown方法是怎樣將阻塞的執行緒喚醒的。

// 減少門閂的方法
public void countDown() {
    sync.releaseShared(1);
}

// 釋放鎖的操作(共享模式)
public final boolean releaseShared(int arg) {
    // 1.嘗試去釋放鎖
    if (tryReleaseShared(arg)) {
        // 2.如果釋放成功就喚醒其他執行緒
        doReleaseShared();
        return true;
    }
    return false;
}

可以看到countDown方法裡面呼叫了releaseShared方法,該方法同樣是AQS裡面的方法,我們在上面也貼出了它的程式碼。releaseShared方法裡面首先是呼叫tryReleaseShared方法嘗試釋放鎖,tryReleaseShared方法在AQS裡面是一個抽象方法,它的具體實現邏輯在子類Sync類裡面,我們在上面貼出的Sync類程式碼裡可以找到該方法。

tryReleaseShared方法如果返回true表示釋放成功,返回false表示釋放失敗,只有當將同步狀態減1後該同步狀態恰好為0時才會返回true,其他情況都是返回false。那麼當tryReleaseShared返回true之後就會馬上呼叫doReleaseShared方法去喚醒同步佇列的所有執行緒。這樣就解釋了為什麼最後一次呼叫countDown方法將計數器減為0後就會喚醒所有被阻塞的執行緒。

private void doReleaseShared() {
       
    for (;;) {
        Node h = head;    // 設定頭節點為h
        if (h != null && h != tail) {
            int ws = h.waitStatus;   // 獲取頭節點的同步狀態
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;    // 如果CAS失敗,就不停的嘗試        
                unparkSuccessor(h);  // 喚醒其他節點
            }
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head)                   
            break;
    }
}

全文完!

上一篇:Java中的阻塞佇列 BlockingQueue 詳解


推薦兩篇文章:

1、Java併發系列 | CountDownLatch原始碼分析【本文原始碼分析部分參考於此】

2、AQS共享模式與併發工具類的實現