大家好,我是小黑,一個在網際網路苟且偷生的農民工。

先問大家一個問題,在主執行緒中建立多個執行緒,在這多個執行緒被啟動之後,主執行緒需要等子執行緒執行完之後才能接著執行自己的程式碼,應該怎麼實現呢?

Thread.join()

看過我 併發程式設計之:執行緒 的朋友應該知道怎麼做,在Thread類中有一個方法join(),這個方法是一個阻塞方法,當前執行緒會等待調動join()方法的執行緒死亡之後再繼續執行。

我們通過程式碼來看看執行結果。

  1. public class JoinDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. for (int i = 0; i < 100; i++) {
  4. Thread t = new Thread(() -> {
  5. System.out.println(Thread.currentThread().getName() + " run ~");
  6. });
  7. t.start();
  8. t.join();
  9. }
  10. System.out.println("main執行緒執行結束");
  11. }
  12. }

從結果可以看出,main執行緒要等到所有子執行緒都執行完之後才會繼續執行,並且每一個子執行緒是按順序執行的。

我們在來看一下join()方法是如何讓主執行緒阻塞的呢?來看一下原始碼。

  1. public final void join() throws InterruptedException {
  2. // 預設傳入0毫秒
  3. join(0);
  4. }
  5. // 本方法是synchronized的
  6. public final synchronized void join(long millis) throws InterruptedException {
  7. long base = System.currentTimeMillis();
  8. long now = 0;
  9. if (millis < 0) {
  10. throw new IllegalArgumentException("timeout value is negative");
  11. }
  12. if (millis == 0) {
  13. // 測試當前執行緒是否還活著
  14. while (isAlive()) {
  15. // 執行wait,當前執行緒等待
  16. wait(0);
  17. }
  18. } else {
  19. while (isAlive()) {
  20. long delay = millis - now;
  21. if (delay <= 0) {
  22. break;
  23. }
  24. wait(delay);
  25. now = System.currentTimeMillis() - base;
  26. }
  27. }
  28. }

從join方法的原始碼中我們可以看到幾個重要的資訊,首先join()方法預設是等待0毫秒;join(long millis)方法是一個synchronized方法;迴圈判斷當前執行緒是否還活著。什麼意思呢?

  1. main執行緒在呼叫執行緒T的join()方法時,會先獲取T物件的鎖;
  2. 在join方法中會呼叫T物件的wait()方法等待,而wait()方法會釋放T物件的鎖,並且main執行緒在執行完wait()之後會進入阻塞狀態;
  3. 最後main執行緒在被notify喚醒之後,需要再迴圈判斷T物件是否還活著,如果還活著會再次執行wait()。

而線上程執行完run()方法之後,JVM會呼叫該執行緒的exit()方法,通過notifyAll()喚醒處於等待狀態的執行緒。

  1. private void exit() {
  2. if (group != null) {
  3. // 終止group中的執行緒this
  4. group.threadTerminated(this);
  5. group = null;
  6. }
  7. /* Aggressively null out all reference fields: see bug 4006245 */
  8. target = null;
  9. /* Speed the release of some of these resources */
  10. threadLocals = null;
  11. inheritableThreadLocals = null;
  12. inheritedAccessControlContext = null;
  13. blocker = null;
  14. uncaughtExceptionHandler = null;
  15. }
  16. void threadTerminated(Thread t) {
  17. synchronized (this) {
  18. remove(t);
  19. if (nthreads == 0) {
  20. // 喚醒等待執行緒
  21. notifyAll();
  22. }
  23. if (daemon && (nthreads == 0) &&
  24. (nUnstartedThreads == 0) && (ngroups == 0))
  25. {
  26. destroy();
  27. }
  28. }
  29. }

細心的話你會發現,使用Thread.join()只能做到讓一個執行緒執行完之後,做不到同時等待多個執行緒,比如我們上面的程式碼,執行緒1執行完之後才能執行執行緒2,無法做到讓執行緒1和執行緒2同時處理。

CountDownLatch

而在JUC包中的工具類CountDownLatch具備和Thread.join()方法同樣的能力,可以等待一個執行緒執行完之後再處理,並且支援同時等待多個執行緒。我們來修改一下上面Thread.join()的例子。

  1. public class CountDownLatchDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. CountDownLatch countDownLatch = new CountDownLatch(100);
  4. for (int i = 0; i < 100; i++) {
  5. Thread t = new Thread(() -> {
  6. System.out.println(Thread.currentThread().getName() + " run ~");
  7. countDownLatch.countDown();
  8. });
  9. t.start();
  10. }
  11. countDownLatch.await();
  12. System.out.println("main執行緒執行結束");
  13. }
  14. }

CountDownLatch需要在建立時指定一個計數值,在子執行緒中執行完之後呼叫countDown()方法進行遞減,主執行緒的await()方法會等到值減為0之後繼續執行。

從執行結果我們可以看到,100個子執行緒並不是按順序執行的,而是隨機的。

我們通過CountDownLatch的原始碼來看一下是如何實現的。

  1. private final Sync sync;
  2. public CountDownLatch(int count) {
  3. if (count < 0) throw new IllegalArgumentException("count < 0");
  4. this.sync = new Sync(count);
  5. }

在CountDownLatch中我們看到有一個Sync變數,從上一期AQS原始碼解析內容中我們知道Sync是AQS的一個子類實現;

首先構造方法傳入的count值會作為引數賦值給Sync中的state變數。

然後我們來看一下線上程中的CountDownLath.countDown()方法會做些什麼事情。

  1. public void countDown() {
  2. // 釋放共享鎖
  3. sync.releaseShared(1);
  4. }
  5. public final boolean releaseShared(int arg) {
  6. if (tryReleaseShared(arg)) {
  7. doReleaseShared();
  8. return true;
  9. }
  10. return false;
  11. }

如果有看我上期AQS原始碼解析的同學一定很熟悉,這段程式碼就是共享鎖的解鎖過程,本質上就是對state-1。

那麼主執行緒是如何實現的等待呢?我們猜一下,應該是去判斷state有沒有減為0,如果減為0則代表所有的執行緒都執行完countDown()方法,則可以繼續執行,如果state還不等於0,則表示還有執行緒正在執行,等待就OK啦。

我們來看看原始碼,是否和我們猜想的一樣呢?

  1. public void await() throws InterruptedException {
  2. // 可中斷地獲取共享鎖
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  6. if (Thread.interrupted())
  7. throw new InterruptedException();
  8. // 嘗試獲取共享鎖
  9. if (tryAcquireShared(arg) < 0)
  10. // state還不是1
  11. doAcquireSharedInterruptibly(arg);
  12. }
  13. // 獲取鎖狀態,當state減為0時,返回1
  14. protected int tryAcquireShared(int acquires) {
  15. return (getState() == 0) ? 1 : -1;
  16. }
  17. private void doAcquireSharedInterruptibly(int arg)
  18. throws InterruptedException {
  19. // 排入隊尾
  20. final Node node = addWaiter(Node.SHARED);
  21. boolean failed = true;
  22. try {
  23. for (;;) {
  24. final Node p = node.predecessor();
  25. if (p == head) {
  26. int r = tryAcquireShared(arg);
  27. if (r >= 0) {
  28. setHeadAndPropagate(node, r);
  29. p.next = null; // help GC
  30. failed = false;
  31. return;
  32. }
  33. }
  34. // 執行緒在這裡park
  35. if (shouldParkAfterFailedAcquire(p, node) &&
  36. parkAndCheckInterrupt())
  37. throw new InterruptedException();
  38. }
  39. } finally {
  40. if (failed)
  41. cancelAcquire(node);
  42. }
  43. }

可以發現await()方法和我們昨天看到的共享鎖解鎖過程一模一樣,符合我們的猜想。

所以,CountDownLatch的底層實現也是依靠AQS來完成的,現在大家肯定對於AQS有更深刻的認識了。

區別

我們現在來對比一下Thread.join()和CountDownLatch有哪些區別:

  • Thread.join()是Thread類的一個方法,而CountDownLatch是JUC包中的一個工具類;
  • Thread.join()的實現是依靠Object的wait()和notifyAll()來完成的,而CountDownLatch是通過AQS完成的;
  • Thread.join()只支援讓一個執行緒等待,不支援同時等待多個執行緒,而CountDownLatch可以支援,所以CountDownLatch的效率要更高。

好的,本期內容就到這裡,我們下期見。