1. 程式人生 > >一篇關於CountDownLatch的好文章

一篇關於CountDownLatch的好文章

convert itl 一次 什麽 thread 並行執行 volatil his ets

CountDownLatch簡介

CountDownLatch是一種java.util.concurrent包下一個同步工具類,它允許一個或多個線程等待直到在其他線程操作執行完成。

使用場景:

在開發過程中,經常會遇到需要在主線程中開啟多條線程去並行執行任務,並且主線程需要等待所有子線程執行完畢後再進行匯總的場景,
CountDownLatch的內部提供了一個計數器,在構造閉鎖時必須指定計數器的初始值,且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操作計數器的值,每調用一次countDown方法計數器都會減1,直到計數器的值減為0,
它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。

CountDownLatch原理

CountDownLatch底層依靠的是AQS,通過構造函數初始化計數器時,實際上是
把計數器的值賦值給了AQS的state,也就是這裏AQS的狀態值來表示計數器值。

public CountDownLatch(int count) {
	if (count < 0) throw new IllegalArgumentException("count < 0");
	this.sync = new Sync(count);
}

Sync(int count) {
   setState(count);
}

await方法

void await()方法,當前線程調用了CountDownLatch對象的await方法後,當前線程會被阻塞,直到出現下面情況之一時才會返回:

  1. 當所有線程都調用了CountDownLatch對象的countDown方法後,也就是說計時器值為 0 的時候。
  2. 其他線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常後返回。接下來讓我們看看await()方法內部是如何調用
public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}
//AQS的獲取共享資源時候可被中斷的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    //如果線程被中斷則拋異常
    if (Thread.interrupted())
         throw new InterruptedException();
        //嘗試看當前是否計數值為0,為0則直接返回,否者進入AQS的隊列等待
    if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
}

//sync類實現的AQS的接口
protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
}

從上面代碼可以看到await()方法委托sync調用了AQS的acquireSharedInterruptibly方法,該方法的特點是線程獲取資源的時候可以被中斷,並且獲取到的資源是共享資源,這裏為什麽要調用AQS的這個方法,而不是調用獨占鎖的accquireInterruptibly方法呢?這是因為這裏狀態值需要的並不是非 0 即 1 的效果,而是和初始化時候指定的計數器值有關系,比如你初始化的時候計數器值為 8 ,那麽state的值應該就有 0 到 8 的狀態,而不是只有 0 和 1 的獨占效果。

這裏await()方法調用acquireSharedInterruptibly的時候傳遞的是 1 ,就是說明要獲取一個資源,而這裏計數器值是資源總數,也就是意味著是讓總的資源數減 1 ,acquireSharedInterruptibly內部首先判斷如果當前線程被中斷了則拋出異常,否則調用sync實現的tryAcquireShared方法看當前狀態值(計數器值)是否為 0 ,是則當前線程的await()方法直接返回,否則調用AQS的doAcquireSharedInterruptibly讓當前線程阻塞。另外調用tryAcquireShared的方法僅僅是檢查當前狀態值是不是為 0 ,並沒有調用CAS讓當前狀態值減去 1 。

boolean await(long timeout, TimeUnit unit)

當線程調用了 CountDownLatch 對象的該方法後,當前線程會被阻塞,直到出現下面情況之一時才會返回:

  1. 當所有線程都調用了 CountDownLatch 對象的 countDown 方法後,也就是計時器值為 0 的時候,這時候返回 true.
  2. 設置的 timeout 時間到了,因為超時而返回 false.
  3. 其它線程調用了當前線程的 interrupt()方法中斷了當前線程,當前線程會拋出 InterruptedException 異常後返回。
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

countDown方法

void countDown() 當前線程調用了該方法後,會遞減計數器的值,遞減後如果計數器為 0 則會喚醒所有調用await 方法而被阻塞的線程,否則什麽都不做。

public void countDown() {
        //委托sync調用AQS的方法
	sync.releaseShared(1);
}
//AQS的方法
public final boolean releaseShared(int arg) {
	//調用sync實現的tryReleaseShared
	if (tryReleaseShared(arg)) {
		//AQS的釋放資源方法
		doReleaseShared();
		return true;
	}
	return false;
}

如上面代碼可以知道CountDownLatch的countDown()方法是委托sync調用了AQS的releaseShared方法,後者調用了sync 實現的AQS的tryReleaseShared

protected boolean tryReleaseShared(int releases) {
  //循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一並更新到state
  for (;;) {
      int c = getState();

      //如果當前狀態值為0則直接返回(1)
      if (c == 0)
          return false;

      //CAS設置計數值減一(2)
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
          return nextc == 0;
  }
}

如上代碼可以看到首先獲取當前狀態值(計數器值),代碼(1)如果當前狀態值為 0 則直接返回 false ,則countDown()方法直接返回;否則執行代碼(2)使用CAS設置計數器減一,CAS失敗則循環重試,否則如果當前計數器為 0 則返回 true 。返回 true 後,說明當前線程是最後一個調用countDown()方法的線程,那麽該線程除了讓計數器減一外,還需要喚醒調用CountDownLatch的await 方法而被阻塞的線程。這裏的代碼(1)貌似是多余的,其實不然,之所以添加代碼 (1) 是為了防止計數器值為 0 後,其他線程又調用了countDown方法,如果沒有代碼(1),狀態值就會變成負數。

getCount()方法

long getCount() 獲取當前計數器的值,也就是 AQS 的 state 的值。

public long getCount() {
     return sync.getCount();
}

int getCount() {
     return getState();
}

如上代碼可知內部還是調用了 AQS 的 getState 方法來獲取 state 的值(計數器當前值)。

使用方法(案例)

public class CountDownLatchTest {

    private static AtomicInteger id = new AtomicInteger();

    // 創建一個CountDownLatch實例,管理計數為ThreadNum
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {

        Thread threadOne = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();
            }
        });

        Thread threadTwo = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();

            }
        });

        Thread threadThree = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();

            }
        });

        // 啟動子線程
        threadOne.start();
        threadTwo.start();
        threadThree.start();
        System.out.println("等待鬥地主玩家進場");

        // 等待子線程執行完畢,返回
        countDownLatch.await();

        System.out.println("鬥地主玩家已經滿人,開始發牌.....");

    }
}
OutPut:
等待鬥地主玩家進場
【玩家0】已入場
【玩家1】已入場
【玩家2】已入場
鬥地主玩家已經滿人,開始發牌.....

如上代碼,創建了一個 CountDownLatch 實例,因為有三個子線程所以構造函數參數傳遞為 3,主線程調用 countDownLatch.await()方法後會被阻塞。子線程執行完畢後調用countDownLatch.countDown() 方法讓 countDownLatch 內部的計數器減一,等所有子線程執行完畢調用 countDown()後計數器會變為 0,這時候主線程的 await()才會返回。

CountDownLatch 與 join 方法的區別,一個區別是調用一個子線程的 join()方法後,該線程會一直被阻塞直到該線程運行完畢,而 CountDownLatch 則使用計數器允許子線程運行完畢或者運行中時候遞減計數,也就是 CountDownLatch 可以在子線程運行任何時候讓 await 方法返回而不一定必須等到線程結束;另外使用線程池來管理線程時候一般都是直接添加 Runable 到線程池這時候就沒有辦法在調用線程的 join 方法了,countDownLatch 相比 Join 方法讓我們對線程同步有更靈活的控制。

轉自: https://www.omgleoo.top/%E4%B8%80%E7%AF%87%E5%85%B3%E4%BA%8Ecountdownlatch%E7%9A%84%E5%A5%BD%E6%96%87%E7%AB%A0/

一篇關於CountDownLatch的好文章