1. 程式人生 > >java 如何實現等待子執行緒結束

java 如何實現等待子執行緒結束

工作中往往會遇到非同步去執行某段邏輯, 然後先處理其他事情, 處理完後再把那段邏輯的處理結果進行彙總的產景, 這時候就需要使用執行緒了。
一個執行緒啟動之後, 是非同步的去執行需要執行的內容的, 不會影響主執行緒的流程,  往往需要讓主執行緒指定後, 等待子執行緒的完成. 這裡有幾種方式.
站在 主執行緒的角度, 我們可以分為主動式和被動式.
主動式指主線主動去檢測某個標誌位, 判斷子執行緒是否已經完成. 被動式指主執行緒被動的等待子執行緒的結束, 很明顯, 比較符合人們的胃口. 就是你事情做完了, 你告訴我, 我彙總一下, 哈哈.
那麼主執行緒如何等待子執行緒工作完成呢. 很簡單, Thread 類給我們提供了join 系列的方法, 這些方法的目的就是等待當前執行緒的die. 舉個例子:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class Threads { public static void main(String[] args) { SubThread thread = new SubThread(); thread.start(); //主執行緒處理其他工作,讓子執行緒非同步去執行. mainThreadOtherWork();
System.out.println("now waiting sub thread done."); //主執行緒其他工作完畢,等待子執行緒的結束, 呼叫join系列的方法即可(可以設定超時時間) try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("now all done."); } private static void mainThreadOtherWork() { System.out.println("main thread work start"
); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread work done."); } public static class SubThread extends Thread { @Override public void run() { working(); } private void working() { System.out.println("sub thread start working."); busy(); System.out.println("sub thread stop working."); } private void busy() { try { sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } } }

本程式的資料有可能是如下:

main thread work start
sub thread start working.
main thread work done.
now waiting sub thread done.
sub thread stop working.
now all done.

忽略標號, 當然輸出也有可能是1和2調換位置了. 這個我們是無法控制的. 我們看下執行緒的join操作, 究竟幹了什麼.

?
1 2 3 public final void join() throws InterruptedException { join(0); }

這裡是呼叫了

?
1 2 public final synchronized void join(long millis) throws InterruptedException

方法, 引數為0, 表示沒有超時時間, 等到執行緒結束為止. join(millis)方法裡面有這麼一段程式碼:

?
1 2 3 while (isAlive()) { wait(0); }

說明, 當執行緒處於活躍狀態的時候, 會一直等待, 直到這裡的isAlive方法返回false, 才會結束.isAlive方法是一個本地方法, 他的作用是判斷執行緒是否已經執行結束. 註釋是這麼寫的:

Tests if this thread is alive. A thread is alive if it has been started and has not yet died.

可見, join系列方法可以幫助我們等待一個子執行緒的結束.
那麼要問, 有沒有另外一種方法可以等待子執行緒結束? 當然有的, 我們可以使用併發包下面的Future模式.
Future是一個任務執行的結果, 他是一個將來時, 即一個任務執行, 立即非同步返回一個Future物件, 等到任務結束的時候, 會把值返回給這個future物件裡面. 我們可以使用ExecutorService介面來提交一個執行緒.

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class Threads { static ExecutorService executorService = Executors.newFixedThreadPool(1); @SuppressWarnings("rawtypes") public static void main(String[] args) throws InterruptedException, ExecutionException { SubThread thread = new SubThread(); // thread.start(); Future future = executorService.submit(thread); mainThreadOtherWork(); System.out.println("now waiting sub thread done."); future.get(); // try { // thread.join(); // } catch (InterruptedException e) { // e.printStackTrace(); // } System.out.println("now all done."); executorService.shutdown(); } private static void mainThreadOtherWork() { System.out.println("main thread work start"); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread work done."); } public static class SubThread extends Thread { @Override public void run() { working(); } private void working() { System.out.println("sub thread start working."); busy(); System.out.println("sub thread stop working."); } private void busy() { try { sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } } }

這裡, ThreadPoolExecutor 是實現了 ExecutorService的方法, sumbit的過程就是把一個Runnable介面物件包裝成一個 Callable介面物件, 然後放到 workQueue裡等待排程執行. 當然, 執行的啟動也是呼叫了thread的start來做到的, 只不過這裡被包裝掉了. 另外, 這裡的thread是會被重複利用的, 所以這裡要退出主執行緒, 需要執行以下shutdown方法以示退出使用執行緒池. 扯遠了. 
這種方法是得益於Callable介面和Future模式, 呼叫future介面的get方法, 會同步等待該future執行結束, 然後獲取到結果. Callbale介面的介面方法是 V call(); 是可以有返回結果的, 而Runnable的 void run(), 是沒有返回結果的. 所以, 這裡即使被包裝成Callbale介面, future.get返回的結果也是null的.如果需要得到返回結果, 建議使用Callable介面.
通過佇列來控制執行緒的進度, 是很好的一個理念. 我們完全可以自己搞個佇列, 自己控制. 這樣也可以實現. 不信看程式碼:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class Threads { // static ExecutorService executorService = Executors.newFixedThreadPool(1); static final BlockingQueue queue = new ArrayBlockingQueue(1); public static void main(String[] args) throws InterruptedException, ExecutionException { SubThread thread = new SubThread(queue); thread.start(); // Future future = executorService.submit(thread); mainThreadOtherWork(); System.out.println("now waiting sub thread done."); // future.get(); queue.take(); // try { // thread.join(); // } catch (InterruptedException e) { // e.printStackTrace(); // } System.out.println("now all done."); // executorService.shutdown(); } private static void mainThreadOtherWork() { System.out.println("main thread work start"); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread work done."); } public static class SubThread extends Thread { private BlockingQueue queue; /** * @param queue */ public SubThread(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { working(); } finally { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private void working() { System.out.println("sub thread start working."); busy(); System.out.println("sub thread stop working."); } private void busy() { try { sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } } }

這裡是得益於我們用了一個阻塞佇列, 他的put操作和take操作都會阻塞(同步), 在滿足條件的情況下.當我們呼叫take()方法時, 由於子執行緒還沒結束, 佇列是空的, 所以這裡的take操作會阻塞, 直到子執行緒結束的時候, 往佇列裡面put了個元素, 表明自己結束了. 這時候主執行緒的take()就會返回他拿到的資料. 當然, 他拿到什麼我們是不必去關心的.
以上幾種情況都是針對子執行緒只有1個的時候. 當子執行緒有多個的時候, 情況就不妙了.
第一種方法, 你要呼叫很多個執行緒的join, 特別是當你的執行緒不是for迴圈建立的, 而是一個一個建立的時候.
第二種方法, 要呼叫很多的future的get方法, 同第一種方法.
第三種方法, 比較方便一些, 只需要每個執行緒都在queue裡面 put一個元素就好了.但是, 第三種方法, 這個佇列裡的物件, 對我們是毫無用處, 我們為了使用佇列, 而要不明不白浪費一些記憶體, 那有沒有更好的辦法呢?
有的, concurrency包裡面提供了好多有用的東東, 其中, CountDownLanch就是我們要用的.
CountDownLanch 是一個倒數計數器, 給一個初始值(>=0), 然後每countDown一次就會減1, 這很符合等待多個子執行緒結束的場景: 一個執行緒結束的時候, countDown一次, 直到所有都countDown了 , 那麼所有子執行緒就都結束了.
先看看CountDownLanch有哪些方法:

CountDownLatch

await: 會阻塞等待計數器減少到0位置. 帶引數的await是多了等待時間.
countDown: 將當前的技術減1
getCount(): 返回當前的計數
顯而易見, 我們只需要在子執行緒執行之前, 賦予初始化countDownLanch, 並賦予執行緒數量為初始值.
每個執行緒執行完畢的時候, 就countDown一下.主執行緒只需要呼叫await方法, 可以等待所有子執行緒執行結束, 看程式碼:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class Threads { // static ExecutorService executorService = Executors.newFixedThreadPool(1); static final BlockingQueue queue = new ArrayBlockingQueue(1); public static void main(String[] args) throws InterruptedException, ExecutionException { int threads = 5; CountDownLatch countDownLatch = new CountDownLatch(threads); for(int i=0;i < threads;i++){ SubThread thread = new SubThread(2000*(i+1), countDownLatch); thread.start(); } mainThreadOtherWork(); System.out.println("now waiting sub thread done."); countDownLatch.await(); System.out.println("now all done."); } private static void mainThreadOtherWork() { System.out.println("main thread work start"); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread work done."); } public static class SubThread extends Thread{ // private BlockingQueue queue; private CountDownLatch countDownLatch; private long work; public SubThread(long work, CountDownLatch countDownLatch) { // this.queue = queue; this.countDownLatch = countDownLatch; this.work = work; } @Override public void run() { try{ working(); }finally{ countDownLatch.countDown(); } } private void working() { System.out.println(getName()+" sub thread start working."); busy(); System.out.println(getName()+" sub thread stop working."); } private void busy() { try { sleep(work); } catch (InterruptedException e) { e.printStackTrace(); } } } }

此種方法也適用於使用 ExecutorService summit 的任務的執行.
另外還有一個併發包的類CyclicBarrier, 這個是(子)執行緒之間的互相等待的利器. 柵欄, 就是把大家都在一個地方堵住, 就像水閘, 等大家都完成了之前的操作, 在一起繼續下面的操作. 不過就不再本篇的討論範圍內了.

轉載自:http://www.jiacheo.org/blog/262