1. 程式人生 > >(2.1.27.13)Java併發程式設計:Lock之CountDownLatch計數式獨享鎖

(2.1.27.13)Java併發程式設計:Lock之CountDownLatch計數式獨享鎖

CountDownLatch是一種java.util.concurrent包下一個同步工具類,它允許一個或多個執行緒等待直到在其他執行緒中一組操作執行完成。

相對於前文的鎖,它主要實現了: 呼叫指定次release後,才會釋放鎖

一、使用

public static void testCountDownLatch(){
        
        int threadCount = 10;
        
        final CountDownLatch latch = new CountDownLatch(threadCount);
        
        for(int i=0; i< threadCount; i++){
             
            new Thread(new Runnable() {
                
                @Override
                public void run() {

                    System.out.println("執行緒" + Thread.currentThread().getId() + "開始出發");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println("執行緒" + Thread.currentThread().getId() + "已到達終點");

                    latch.countDown();
                }
            }).start();
        }
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10個執行緒已經執行完畢!開始計算排名");
    }

二、總體結構

public class CountDownLatch {

	public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

	/** 
	 * 該方法會使執行緒進入等待狀態,直到計數器減至0,或者執行緒被中斷。當計數器為0時,呼叫
	 * 此方法將會立即返回,不會被阻塞住。
	 */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	
	/** 帶有超時功能的 await */
	public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
	
	
	public void countDown() {
        sync.releaseShared(1);
    }
}

值得注意的是:

通常而言”AQS裡的state在重入鎖裡代表執行緒重入的次數,state=1代表重入鎖當前已被某個執行緒獨佔,這個執行緒每重入一次,state++“,但是在CountDownLatch中:

  1. state在AQS被例項化時構建為 必須釋放的次數
  2. state>0 表示 還需要被釋放

2.1 AQS的實現

我們回憶一下AQS需要重寫的鉤子方法:

方法名稱 描述
boolean tryAcquire(int arg) 獨佔式嘗試獲取同步狀態(通過CAS操作設定同步狀態),如果成功返回true,反之返回false
boolean tryRelease(int arg) 獨佔式釋放同步狀態,成功返回true,失敗返回false。
int tryAcquireShared(int arg) 共享式的獲取同步狀態,返回大於等於0的值,表示獲取成功,反之失敗。
boolean tryReleaseShared(int arg) 共享式釋放同步狀態,成功返回true,失敗返回false。
boolean isHeldExclusively() 判斷同步器是否在獨佔模式下被佔用,一般用來表示同步器是否被當前執行緒佔用

程式碼很簡單了

  1. tryAcquireShared必須滿足 state==0,才獲取到鎖
  2. tryReleaseShared實現state自減
public class CountDownLatch {

    private final Sync sync;

    /** CountDownLatch 的構造方法,該方法要求傳入大於0的整型數值作為計數器 */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 初始化 Sync
        this.sync = new Sync(count);
    }
    
    /** CountDownLatch 的同步控制器,繼承自 AQS */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // 設定 AQS state
            setState(count);
        }

        int getCount() {
            return getState();
        }

        /** 嘗試在共享狀態下獲取同步狀態,該方法在 AQS 中是抽象方法,這裡進行了覆寫 */
        protected int tryAcquireShared(int acquires) {
            /*
             * 如果 state = 0,則返回1,表明可獲取同步狀態,
             * 此時執行緒呼叫 await 方法時就不會被阻塞。
             */ 
            return (getState() == 0) ? 1 : -1;
        }

        /** 嘗試在共享狀態下釋放同步狀態,該方法在 AQS 中也是抽象方法 */
        protected boolean tryReleaseShared(int releases) {
            /*
             * 下面的邏輯是將 state--,state 減至0時,呼叫 await 等待的執行緒會被喚醒。
             * 這裡使用迴圈 + CAS,表明會存在競爭的情況,也就是多個執行緒可能會同時呼叫 
             * countDown 方法。在 state 不為0的情況下,執行緒呼叫 countDown 是必須要完
             * 成 state-- 這個操作。所以這裡使用了迴圈 + CAS,確保 countDown 方法可正
             * 常執行。
             */
            for (;;) {
                // 獲取 state
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                
                // 使用 CAS 設定新的 state 值
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
}