基於介面回撥詳解JUC中Callable和FutureTask實現原理
Callable介面和FutureTask實現類,是JUC(Java Util Concurrent)包中很重要的兩個技術實現,它們使獲取多執行緒執行結果成為可能。它們底層的實現,就是基於介面回撥技術。介面回撥,許多程式員都耳熟能詳,這種技術被廣泛應用於非同步模組的開發中。它的實現原理並不複雜,但是對初學者來說卻並不友好,其中的一個原因是它的使用場景和處理手段,對習慣了單執行緒開發的初學者來說有點繞。而各種文章或書籍,在解釋這一個問題的時候,往往忽視了使用場景,而舉一些小明坐車、A和B等等的例子,初學者看完之後往往更迷糊。
本文立足於此,就從多執行緒中執行緒結果獲取這一需求場景出發,逐步說明介面回撥及其在JUC中的應用。
需要了解Java多執行緒的底層執行機制,可以看這一篇: ofollow,noindex" target="_blank">基於JVM原理、JMM模型和CPU快取模型深入理解Java併發程式設計
執行緒結果獲取
習慣了單執行緒開發的程式設計師,在非同步程式設計中最難理解的一點,就是如何從執行緒執行結果返回資訊,因為run和start方法本身是沒有返回值的。一個基本的方法是,使用一個變數暫存執行結果,另外提供一個公共方法來返回這個變數。實現程式碼如下:
1 /* 2* 設計可以返回執行結果的執行緒 3* 定義一個執行緒讀取檔案內容, 使用字串存取結果並返回主執行緒 4*/ 5 public class ReturnDigestTest extends Thread{ 6//定義檔名 7private String fileName; 8//定義一個字串物件result, 用於存取執行緒執行結果 9private String result; 10 11public ReturnDigestTest(String fileName) { 12this.fileName = fileName; 13} 14//run方法中讀取本目錄下檔案, 並存儲至result 15@Override 16public void run() { 17try (FileInputStream fis = new FileInputStream(fileName)){ 18byte[] buffer = new byte[1024]; 19int hasRead = 0; 20while ((hasRead = fis.read(buffer)) > 0) { 21result = new String(buffer, 0, hasRead); 22} 23} catch (IOException e) { 24e.printStackTrace(); 25} 26} 27//定義返回result結果的方法 28public String getResult() { 29return result; 30} 31public static void main(String[] args) throws InterruptedException { 32//測試, 在子執行緒中執行讀取檔案, 主執行緒返回 33ReturnDigestTest returnDigestTest = new ReturnDigestTest("test.txt"); 34returnDigestTest.start(); 35//以下結果返回null. 因為getResult方法執行的時候, 子執行緒可能還沒結束 36System.out.println(returnDigestTest.getResult()); 37} 38 }
執行結果會輸出一個null,原因在於讀取檔案的執行緒需要執行時間,所以很可能到主執行緒呼叫getResult方法的時候,子執行緒還沒結束,結果就為null了。
如果在上面程式碼第35行,增加TimeUnit.SECONDS.sleep(5); 使主執行緒休眠5秒鐘,你會發現結果正確返回。
競態條件
在多執行緒環境下的實際開發場景中,更為常見的情形是,業務執行緒需要不斷迴圈獲取多個執行緒執行的返回結果。如果按照上述思路開發,那可能的結果為null,也可能導致程式掛起。上述方法是否成功,取決於競態條件(Race Condition),包括執行緒數、CPU數量、CPU運算速度、磁碟讀取速度、JVM執行緒排程演算法。
輪詢
作為對上述方法的一個優化,可以讓主執行緒定期詢問返回狀態,直到結果非空在進行獲取,這就是輪詢的思路。沿用上面的例子,只需要把36行修改如下即可:
1 //使用輪詢, 判斷執行緒返回結果是否為null 2while (true) { 3if (returnDigestTest.getResult() != null) { 4System.out.println(returnDigestTest.getResult()); 5break; 6} 7}
但是,這個方法仍然不具有普適性,在有些JVM,主執行緒會佔用幾乎所有執行時間,而導致子執行緒無法完成工作。
即便不考慮這個因素,這個方法仍然不理想,它使得CPU執行時間被額外佔用了。就好像一個搭公交的小孩,每一站都在問:請問到站了嗎?因此,比較理想的方法,是讓子執行緒在它完成任務後,通知主執行緒,這就是回撥方法。
介面回撥的應用
在非同步程式設計中,回撥的意思是,一個執行緒在執行中或完畢後,通知另外一個執行緒,返回一些訊息。而介面回撥,則是充分利用了Java多型的特徵,使用介面作為回撥方法的引用。
使用介面回撥技術來優化上面的問題,可以設計一個實現Runnable介面的類,一個回撥方法的介面,以及一個回撥方法介面的實現類(main方法所在類),具體實現如下
實現Runnable的類
1 /* 2* 使用介面回撥, 實現執行緒執行結果的返回 3*/ 4 public class CallbackDigest implements Runnable{ 5private String fileName; 6private String result; 7//定義回撥方法介面的引用 8private CallbackUserInterface cui; 9public CallbackDigest(String fileName, CallbackUserInterface cui) { 10this.fileName = fileName; 11this.cui = cui; 12} 13@Override 14public void run() { 15try (FileInputStream fis = new FileInputStream(fileName)){ 16byte[] buffer = new byte[1024]; 17int hasRead = 0; 18while((hasRead = fis.read(buffer)) > 0) { 19result = new String(buffer, 0, hasRead); 20} 21//通過回撥介面引用, 呼叫了receiveResult方法, 可以在主執行緒中返回結果. 22//此處利用了多型 23cui.receiveResult(result, fileName); 24} catch (IOException e) { 25e.printStackTrace(); 26} 27} 28 }
回撥方法介面
1 public interface CallbackUserInterface { 2//只定義了回撥方法, 傳入一個待讀取的檔名引數, 和返回結果 3public void receiveResult(String result, String fileName); 4 }
回撥方法介面實現類
1 public class CallbackTest implements CallbackUserInterface { 2//實現回撥方法 3@Override 4public void receiveResult(String result, String fileName) { 5System.out.println("檔案" + fileName + "的內容是: \n" + result); 6} 7 8public static void main(String[] args) { 9//新建回撥介面引用, 指向實現類的物件 10CallbackUserInterface test = new CallbackTest(); 11new Thread(new CallbackDigest("test.txt", test)).start(); 12} 13 }
介面回撥的技術主要有4個關鍵點:
1. 發出資訊的執行緒類 :定義回撥方法介面的引用,在構造方法中初始化。
2. 發出資訊的執行緒類 :使用回撥方法介面的引用, 來呼叫回撥方法。
3. 收取資訊的執行緒類 :實現回撥介面,新建回撥介面的引用,指向該類的物件。
4. 發出資訊的執行緒類 :新建執行緒類物件是,傳入3中新建的實現類物件。
Callable和FutureTask的使用
Callable的底層實現類似於一個回撥介面,而FutureTask類似於本例子中讀取檔案內容的執行緒實現類。因為FutureTask實現了Runnable介面,所以它的實現類是可以多執行緒的,而內部就是呼叫了Callable介面實現類的回撥方法,從而實現執行緒結果的返回機制。demo程式碼如下:
1 public class TestCallable implements Callable<Integer>{ 2//實現Callable並重寫call方法作為執行緒執行體, 並設定返回值1 3@Override 4public Integer call() throws Exception { 5System.out.println("Thread is running..."); 6Thread.sleep(3000); 7return 1; 8} 9 10public static void main(String[] args) throws InterruptedException, ExecutionException { 11//建立Callable實現類的物件 12TestCallable tc = new TestCallable(); 13//建立FutureTask類的物件 14FutureTask<Integer> task = new FutureTask<>(tc); 15//把FutureTask實現類物件作為target,通過Thread類物件啟動執行緒 16new Thread(task).start(); 17System.out.println("do something else..."); 18//通過get方法獲取返回值 19Integer integer = task.get(); 20System.out.println("The thread running result is :" + integer); 21} 22 }