1. 程式人生 > >二十七、併發程式設計之併發工具類CountDownLatch詳解

二十七、併發程式設計之併發工具類CountDownLatch詳解

在JDK的併發包裡(java.util.concurrent)提供了這樣幾個非常有用的併發工具類來解決併發程式設計的流程控制。分別是CountDownLatch、CyclicBarrier和Semaphore。

1、CountDownLatch是什麼?

CountDownLatch類位於java.util.concurrent包下,利用它可以實現類似計數器的功能。CountDownLatch大多是被用在等待多執行緒完成,具體來說就是允許一個或多個執行緒等待其他執行緒完成操作。比如有一個任務A,它要等待其他4個任務執行完畢之後才能執行,此時就可以利用CountDownLatch來實現這種功能了。

2、CountDownLatch原理

CountDownLatch類只提供了一個構造器:

public void CountDownLatch(int count) {...}

在 CountDownLunch啟動的時候。主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。在每次任務執行完直接呼叫,計數器就會減一操作。

//呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行
public void await() throws InterruptedException {
}; //和await()類似,只不過等待一定的時間後count值還沒變為0的話就會繼續執行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //將count值減1 public void countDown() { };

await()這個方法就是用來堵塞主執行緒的,前者是有等待時間的,可以自定義,後者是無限等待,直到其他count 計數器為0為止。

在這裡插入圖片描述

3、使用場景

  • 超時機制
    主執行緒裡面設定好等待時間,如果發現在規定時間內還是沒有返回結果,那就喚醒主執行緒,拋棄。
  • 開始執行前等待n個執行緒完成各自任務
    例如應用程式啟動類要確保在處理使用者請求前,所有N個外部系統已經啟動和運行了。
  • 死鎖檢測
    一個非常方便的使用場景是,你可以使用n個執行緒訪問共享資源,在每次測試階段的執行緒數目是不同的,並嘗試產生死鎖。

4、深入原始碼

CountDownLunch 原始碼底層是由AbstractQueuedSynchronizer提供支援(後面就簡稱 AQS),所以其資料結構就是AQS的資料結構,而AQS的核心就是兩個虛擬佇列:同步佇列syncQueue 和條件佇列conditionQueue(前者資料結構是雙向連結串列,後者是單向連結串列)不同的條件會有不同的條件佇列。

  • Sync原始碼
	private static final class Sync extends AbstractQueuedSynchronizer {
		// 版本號
		private static final long serialVersionUID = 4982264981922014374L;

		// 構造器
		Sync(int count) {
			setState(count);
		}

		// 返回當前計數
		int getCount() {
			return getState();
		}

		// 試圖在共享模式下獲取物件狀態
		protected int tryAcquireShared(int acquires) {
			return (getState() == 0) ? 1 : -1;
		}

		// 試圖設定狀態來反映共享模式下的一個釋放
		protected boolean tryReleaseShared(int releases) {
			// Decrement count; signal when transition to zero
			// 無限迴圈
			for (;;) {
				// 獲取狀態
				int c = getState();
				if (c == 0) // 沒有被執行緒佔有
					return false;
				// 下一個狀態
				int nextc = c-1;
				if (compareAndSetState(c, nextc)) // 比較並且設定成功
					return nextc == 0;
			}
		}
	}
  • await函式
    此函式將會使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷。其原始碼如下:
public void await() throws InterruptedException{
      // 轉發到sync物件上 
      sync.acquireSharedInterruptibly(1);
}

原始碼可知,對CountDownLatch物件的await的呼叫會轉發為對Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的呼叫。

	public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
		if (Thread.interrupted()) 
			throw new InterruptedException(); 
		if (tryAcquireShared(arg) < 0) 
			doAcquireSharedInterruptibly(arg); 
	}

這裡先檢測了執行緒中斷狀態,中斷了則丟擲異常,接下來呼叫tryAcquireShared,tryAcquireShared是Syn的實現的。

protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}

其實就是簡單的獲取了同步器的state,判斷是否為0。
接下來是

	private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
		final Node node = addWaiter(Node.SHARED);
		boolean failed = true;
		try {
			for (;;) {
				final Node p = node.predecessor();
				if (p == head) {
					int r = tryAcquireShared(arg);
					if (r >= 0) {
						setHeadAndPropagate(node, r);
						p.next = null; // help GC
						failed = false;
						return;
					}
				}
				if (shouldParkAfterFailedAcquire(p, node) &&
						parkAndCheckInterrupt())
					throw new InterruptedException();
			}
		} finally {
			if (failed)
				cancelAcquire(node);
		}
	}

關鍵點是parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
 }

執行到此處時,執行緒會阻塞,知道有其他執行緒喚醒此執行緒,執行await之後,上文中的主執行緒阻塞在這。
在這裡插入圖片描述

  • countDown函式
    此函式將遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒
void countDown() { 
    sync.releaseShared(1);
}

可以看出 對countDown的呼叫轉換為對Sync物件的releaseShared(從AQS繼承而來)方法的呼叫。

5、使用

/**
 * 對一個文字中的所有數字先行求和,再把所有行彙總
 * */
public class Demo2 {
	private int[] nums; //儲存每一行求和數
	
	public Demo2(int line) {
		nums = new int[line];
	}

	//行求和
	public void calc(String line, int index,CountDownLatch latch) {
		String[] nus = line.split(","); // 切分出每個值
		int total = 0;
		for (String num : nus) {
			total += Integer.parseInt(num);
		}
		nums[index] = total; // 把計算的結果放到陣列中指定的位置
		System.out.println(Thread.currentThread().getName() + " 執行計算任務... " + line + " 結果為:" + total);
		latch.countDown();//CountDownLatch計數器減1
	}

	//行彙總
	public void sum() {
		System.out.println(Thread.currentThread().getName()+"彙總執行緒開始執行... ");
		int total = 0;
		for (int i = 0; i < nums.length; i++) {
			total += nums[i];
		}
		System.out.println("最終的結果為:" + total);
	}
	//讀取文字資料
	private static List<String> readFile() {
		List<String> contents = new ArrayList<>();
		String line = null;
		BufferedReader br = null;
		try {
			br = new BufferedReader(new FileReader("/Users/Amy/Downloads/nums.txt"));
			while ((line = br.readLine()) != null) {
				contents.add(line);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (br != null) {
				try {
					br.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
		return contents;
	}
	public static void main(String[] args) {
		List<String> contents = readFile();//讀取文字資料
		int lineCount = contents.size();//行數
		CountDownLatch latch = new CountDownLatch(lineCount);
		Demo2 d = new Demo2(lineCount);
		for (int i = 0; i < lineCount; i++) {
			final int j = i;
			new Thread(new Runnable() {
				@Override
				public void run() {
					d.calc(contents.get(j), j,latch);//行求和
				}
			}).start();
		}
		try {
			latch.await();//等待所有執行緒將行求和全部進行完畢
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		d.sum();//彙總求和
	}
}