非同步回撥 大起底之 Java Future 與 Guava Future
future模式大起底
瘋狂創客圈 Java 分散式聊天室【 億級流量】實戰系列之 -17【 部落格園 總入口 】
文章目錄
- future模式大起底
- 1. **Future模式非同步回撥大起底**
- 1.1. **從泡茶的案例說起**
- 1.2. **何為非同步回撥**
- 1.3. **非同步阻塞悶葫蘆——join**
- 1.4. **非同步阻塞重武器——FutureTask系列類**
- 1.4.1. Callable介面
- 1.4.2. FutureTask類初探
- 1.4.3. Future介面
- 1.4.4. FutureTask再次深入
- 1.4.5. 喝茶例項演進之——獲取非同步結果
- 1.4.6. FutureTask使用流程
- 1.5. **Guava 的非同步回撥**
- 1.5.1. 能力匯入 —— FutureCallback
- 1.5.2. 能力擴充套件 —— ListenableFuture
- 1.5.3. ListenableFuture 例項從何而來
- 1.5.4. Guava非同步回撥的流程
- 1.5.5. 喝茶例項 —— 非同步回撥演進
- 寫在最後
- 瘋狂創客圈 Java 死磕系列
原始碼IDEA工程獲取連結: Java 聊天室 實戰 原始碼
寫在前面
大家好,我是作者尼恩。
前面,已經完成一個高效能的 Java 聊天程式的四件大事:
-
完成了協議選型,選擇了效能更佳的 Protobuf協議。具體的文章為: Netty+Protobuf 整合一:實戰案例,帶原始碼
-
介紹了 通訊訊息資料包的幾條設計準則。具體的文章為: Netty +Protobuf 整合二:protobuf 訊息通訊協議設計的幾個準則
-
解決了一個非常基礎的問題,這就是通訊的 **粘包和半包問題。**具體的文章為:Netty 粘包/半包 全解 | 史上最全解讀
-
前一篇檔案,已經完成了 系統三大組成模組的組成介紹。 具體的文章為:Netty聊天程式(實戰一):從0開始實戰100w級流量應用
在設計客戶端之前,發現一個非常重要的基礎知識點,沒有講到。這個知識點就是非同步回撥。
由於非同步回撥使用頻率是如此之高,所以不得不停下來,詳細介紹一下。
1. Future模式非同步回撥大起底
隨著移動網際網路的蓬勃發展,業務架構也隨之變得錯綜複雜,業務系統越來越多。打個簡單的比方:之前一個業務只需要調取一次第三方介面,如今,該業務需調取多個甚至N個不同的第三方介面,獲取N種上遊資料。通常,我們處理方法是非同步去調取這些介面。
問題就來了,如何獲取處理非同步呼叫的結果呢 ?
或者說,非同步執行緒執行完成後,如何與發起執行緒互動呢?
這就涉及到執行緒的非同步回撥問題,這也是大流量高併發不可迴避的問題。
首先,瞭解下同步、非同步、阻塞、非阻塞、回撥等相關概念;
其次,簡單介紹java future和guava future相關技術,並通過示例程式碼進一步對其進行理解;
最後,對java future和guava future進行比較。
1.1. 從泡茶的案例說起
寫到這裡,尼恩就想到了在中學8年級的語文課。在課本中,有一篇華羅庚的課文——《統籌方法》,課文介紹的是統籌方法,該方法的主要目的是合理安排工作流程中的各道工序。
裡邊舉了一個泡茶的例子。列出了三種泡茶的工序模型。在文中的三種工序流程中,有多重排列組合的模式。
工序模型一:順序模式
洗好水壺,灌上涼水,放在火上;
等水開,洗茶壺、洗茶杯;
洗完茶杯後,泡茶喝。
工序模型二:併發模式
洗好水壺,灌上涼水,放在火上;
在等待水開的時間裡,洗茶壺、洗茶杯;
等水開了,泡茶喝。
《統籌方法》這篇文章中,忽略了一個很很重要的問題: 就是等水開是一段數量級最大的時間,這個時間,遠遠超過了準備水、準備茶杯的時間。
從實際出發,為了不浪費等水開時間,尼恩在這裡增加一個動作 —— 讀書。並且,當水燒好後,通知作者停止讀書,去泡茶喝。這就相當於回撥模式。
工序模式三:回撥模式
洗好水壺,灌上涼水,放在火上;
在等待水開的時間裡,洗茶壺、洗茶杯;
在等水開的時間裡,讀書;
水開了,通知作者泡茶喝。
對比起來:順序模式效率最低,回撥模式效率最高。
以上三種模式泡茶喝的方式,使用Java,如何實現呢?
先來看一些基本的概念吧!
1.2. 何為非同步回撥
前面只是一個例子,對併發的主要模式進行形象的說明。
下面正式來說下常用的幾個和併發相關的概念。
1.2.1. 同步、非同步、阻塞、非阻塞
一:同步
所謂同步,就是在發出一個功能呼叫時,在沒有得到結果之前,該呼叫就不返回。也就是必須一件一件事做,等前一件做完了才能做下一件事。
單執行緒模式,就是絕對同步的。
二: 非同步
非同步首先必須是多執行緒模式。是指當前執行緒,向其他的非同步執行緒發出呼叫指令。當前執行緒和非同步執行緒,邏輯上同時執行。
三:阻塞
在非同步的場景下,當前執行緒阻塞住,等待非同步執行緒的執行結果。阻塞是指執行緒進入非可執行狀態,在這個狀態下,cpu不會給執行緒分配時間片,即執行緒暫停執行。
阻塞模式是效率比較低的,如果阻塞嚴重的話,相當於又回到了同步的時代。
四:非阻塞
非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前,當前執行緒不會阻塞住,而會繼續向下執行。
回撥就是一種非阻塞的非同步模式。併發執行緒通過回撥,可以將結果返回給發起執行緒。除了回撥,還有其他的非阻塞非同步模式,比如訊息通訊、訊號量等等。
1.2.2. 阻塞模式的泡茶案例圖解
阻塞模式的泡茶模型,對應到前面的第二種泡茶喝的工序模型。
在阻塞模式泡茶喝的模型中,有三條執行緒,他們分別是:
執行緒一:燒水執行緒
洗好水壺,灌上涼水,放在火上;
執行緒二:清洗執行緒
洗茶壺、洗茶杯;
執行緒三:主執行緒
分別啟動燒水執行緒、清洗執行緒。等水開了,等水杯洗好了,然後泡茶喝。
具體如下圖:
1.2.3. 回撥模式的泡茶方法
前面提到,阻塞模式的效率不是最高的。
更高效率的是回撥模式。主執行緒在等待的時間了,不是死等,而是去幹讀書的活兒。等其他兩條執行緒完成後,通過回撥方式,去完成泡茶的動作。
在回撥模式泡茶喝的模型中,還是三條執行緒,他們的工作稍微有些變動:
執行緒一:燒水執行緒
洗好水壺,灌上涼水,放在火上;燒好水後,去執行泡茶回撥。
執行緒二:清洗執行緒
洗茶壺、洗茶杯;清洗完成後,也去執行一下泡茶的動作。
執行緒三:主執行緒
分別啟動燒水執行緒、清洗執行緒。然後去讀書。
具體如下圖:
嚴格來說,上圖是經不起推敲的。
為啥呢? 那個泡茶喝回撥方法,在執行的流程上,不屬於主執行緒在執行。只是在業務邏輯上,泡茶喝這個動作與主執行緒上的其他動作,關聯性更強。
上圖,更好的理解方式是,儘量站在業務流程的角度去理解。
回撥不是唯一的非阻塞方式。
還有執行緒間通訊、訊號量等等,很多的非阻塞方式。但是回撥卻是一種最好用的、也是開發中用的最多的執行緒間非阻塞的互動方式。
下面,從最原始的阻塞模式講起,起底整個非同步回撥模式。
1.3. 非同步阻塞悶葫蘆——join
Java中,執行緒有一個join操作,也叫執行緒的合併。
join操作的作用,就是完成非同步阻塞的工作——阻塞當前的執行緒,直到非同步的併發執行緒的執行完成。
1.3.1. 執行緒的join 合併
如果執行緒A的執行過程中,通過B.join操作,合併B執行緒,叫做執行緒的合併。合併的重要特點之一是,執行緒A進入阻塞模式,直到B執行緒執行完成。
為了方便表達,模擬一下包工頭的甲方和乙方。
將發起合併的執行緒A叫做甲方執行緒,被髮起的執行緒B為乙方執行緒。
簡單的說,執行緒合併就是——甲方等待乙方執行完成。換句話說,甲方將乙方執行緒合併到甲方執行緒。
在泡茶喝的例子中,主執行緒通過join操作,等待燒水執行緒和清洗執行緒。這就是一種非同步阻塞。
具體如下圖:
![img](file:///C:\Users\qinglin\AppData\Local\Temp\ksohtml\wps2659.tmp.png)
1.3.2. join 非同步阻塞例項程式碼
先看例項,再看方法的詳細介紹。
泡茶喝的非同步阻塞版本,實現如下:
package com.crazymakercircle.coccurent;
import com.crazymakercircle.util.Print;
/**
* Created by 尼恩 at 瘋狂創客圈
*/
public class JoinDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}
static class HotWarterThread extends Thread {
public HotWarterThread() {
super("** 燒水-Thread");
}
public void run() {
try {
Print.tcfo("洗好水壺");
Print.tcfo("灌上涼水");
Print.tcfo("放在火上");
//執行緒睡眠一段時間,代表燒水中
Thread.sleep(SLEEP_GAP);
Print.tcfo("水開了");
} catch (InterruptedException e) {
Print.tcfo(" 發生異常被中斷.");
}
Print.tcfo(" 執行結束.");
}
}
static class WashThread extends Thread {
public WashThread() {
super("$$ 清洗-Thread");
}
public void run() {
try {
Print.tcfo("洗茶壺");
Print.tcfo("洗茶杯");
Print.tcfo("拿茶葉");
//執行緒睡眠一段時間,代表清洗中
Thread.sleep(SLEEP_GAP);
Print.tcfo("洗完了");
} catch (InterruptedException e) {
Print.tcfo(" 發生異常被中斷.");
}
Print.tcfo(" 執行結束.");
}
}
public static void main(String args[]) {
Thread hThread = new HotWarterThread();
Thread wThread = new WashThread();
hThread.start();
wThread.start();
try {
// 合併燒水-執行緒
hThread.join();
// 合併清洗-執行緒
wThread.join();
Thread.currentThread().setName("主執行緒");
Print.tcfo("泡茶喝");
} catch (InterruptedException e) {
Print.tcfo(getCurThreadName() + "發生異常被中斷.");
}
Print.tcfo(getCurThreadName() + " 執行結束.");
}
}
演示程式中有三條執行緒:
一條是主執行緒main;
一條是燒水執行緒“hThread”;
一條是清洗執行緒“wThread”;
main執行緒,呼叫了hThread.join()例項方法,合併燒水執行緒,也呼叫了 wThread.join()例項方法,合併清洗執行緒。
另外說明一下:hThread是這裡的燒水執行緒例項的控制代碼,"** 燒水-Thread"是燒水執行緒例項的執行緒名稱,兩者不能混淆。
1.3.3. join方法的詳細介紹
join的方法應用場景:非同步阻塞場景。
具體來說:甲方(發起執行緒)的呼叫乙方(被髮起執行緒)的join方法,等待乙方執行完成;如果乙方沒有完成,甲方阻塞。
join是Thread類的一個例項方法,使用的方式大致如下:
// 合併燒水-執行緒
hThread.join();
// 合併清洗-執行緒
wThread.join();
實際上,join方法是有三個過載版本:
(1)void join(): 等待乙方執行緒執行結束,甲方執行緒重啟執行。
(2)void join(long millis): 等待乙方執行緒執行一段時間,最長等待時間為 millis 毫秒。超過millis 毫秒後,不論乙方是否結束,甲方執行緒重啟執行。
(3)void join(long millis, int nanos): 等待乙方執行緒執行一段時間,最長等待時間為 millis 毫秒,加nanos 納秒。超過時間後,不論乙方是否結束,甲方執行緒重啟執行。
強調一下容易混淆的幾點:
(1)join方法是例項方法,需要使用執行緒控制代碼去呼叫,如thread.join();
(2)執行到join程式碼的時候,不是thread所指向的執行緒阻塞,而是當前執行緒阻塞;
(3)thread執行緒代表的是被合併執行緒(乙方),當前執行緒阻塞執行緒(甲方)。當前執行緒讓出CPU,進入等待狀態。
(4)只有等到thread執行緒執行完成,或者超時,當前執行緒才能啟動執行。
join合併有一個很大的問題,就是沒有返回值。
如果燒水執行緒的水有問題,或者燒水壺壞了,mian執行緒是沒有辦法知道的。
如果清洗執行緒的茶杯有問題,清洗不來了,mian執行緒是沒有辦法知道的。
形象的說,join執行緒就是一個悶葫蘆。
還是非同步阻塞,但是需要獲得結果,怎麼辦呢?
可以使用java 的FutureTask 系列類。
1.4. 非同步阻塞重武器——FutureTask系列類
FutureTask相關的型別,處於java.util.concurrent包中,不止一個類,是一個系列。同時,這也是Java語言在1.5 版本之後提供了一種的新的多執行緒使用方法。
1.4.1. Callable介面
我們知道,非同步執行緒的一個重要介面是Runnable,這裡執行非同步執行緒的業務程式碼。但是,Runnable的run方法有一個問題,它是沒有返回的。
因此,Runnable不能用在需要有非同步返回值的非同步場景。
Java語言在1.5 版本之後重新定義了一個新的、類似Runnable的介面,Callable介面,將run方法改為了call方法,並且帶上了返回值。
Callable的程式碼如下:
package java.util.concurrent;
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable介面位於java.util.concurrent包中,Callable介面是一個泛型介面。也是一個“函式式介面”。唯一的抽象方法call有返回值,返回值型別為泛型形參型別。call抽象方法還有一個Exception的異常宣告,容許方法的實現版本內部的異常不經過捕獲。
Callable介面類似於Runnable。不同的是,Runnable的唯一抽象方法run沒有返回值,也沒有強制審查異常的異常宣告。比較而言,Callable介面的功能更強大一些。
有一個異想天開的問題:
作為新版的Callable介面例項,能否作為Thread執行緒例項的target來使用呢?
答案是不能。
Callable介面與Runnable介面之間沒有任何的繼承關係,而且二者唯一方法在的名字上也不同。Callable介面例項沒有辦法作為Thread執行緒例項的target來使用。
我們知道,java裡邊的執行緒型別,就是Thread。Callable需要非同步執行,就需要和Thread建立聯絡。java提供了一個搭橋的角色——FutureTask類。
1.4.2. FutureTask類初探
顧名思義,這個是一個未來執行的任務,就相當於新執行緒所執行的操作。
FutureTask 類也位於 java.util.concurrent包。
FutureTask類 建構函式的引數為 Callable,並且間接的繼承了Runnable介面。其構造器程式碼如下:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
到了這裡,FutureTask類的作用就大致明白了。
如果還不明白,看一段例項程式碼:
Callable<Boolean> hJob = new HotWarterJob();
FutureTask<Boolean> hTask =
new FutureTask<Boolean>(hJob);
Thread hThread = new Thread(hTask, "** 燒水-Thread");
FutureTask就像一座位於Callable與Thread之間的橋。FutureTask 封裝一個Callable,然後自身又作為Thread執行緒的target。
FutureTask還有一個十分重要的貢獻。
Thread執行緒執行過程中,非同步執行緒的程式碼邏輯在Callable的call方法中,而call方法返回的結果,則需要通過 FutureTask 去獲取。
好了,這下就應該基本清楚了。
總結一下FutureTask這個媒婆的作用:
(1)負責牽線
(2)通過媒婆取得結果
為了完成這個兩個偉大的使命,FutureTask有個相對比較複雜的繼承關係,具體如下圖:
首先,FutureTask實現了一個介面——RunnableFuture介面,而該RunnableFuture介面繼承了Runnable介面和Future介面。
Runnable介面我們很熟悉,就是那個java 執行緒Runnable,代表非同步執行緒的程式碼邏輯。
Future介面又是啥呢?
提前劇透下,這個介面,就是用來獲取非同步執行緒結果的。
Future介面和Runnable介面一樣,都是牛氣沖天的介面。 而FutureTask 間接的實現這個兩大介面。
正因為FutureTask能夠有兩個很牛逼的爹,所以自己家才很牛逼。
FutureTask 既能當做一個Runnable 作為 target ,直接被Thread執行;也能作為Future用來去取得Callable的計算結果。
1.4.3. Future介面
Future介面這個不是一個複雜的介面,梳理一下,主要提供了3大功能:
(1)獲取併發的任務完成後的執行結果。
(2)能夠取消併發執行中的任務;
(3)判斷併發任務是否執行完成;
當然,第一點是最為常用的。也是這個介面的最初使命。
Future介面的程式碼如下:
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
對Future介面的方法,詳細說明如下:
V get() :獲取併發任務執行的結果。注意,這個方法是阻塞性的。如果併發任務沒有執行完成,呼叫此方法的執行緒會一直阻塞,直到併發任務執行完成。
V get(Long timeout , TimeUnit unit) :獲取併發任務執行的結果。也是阻塞性的,但是會有阻塞的時間限制,如果阻塞時間超過設定的timeout時間,該方法將丟擲異常。
boolean isDone():獲取併發任務的執行狀態。如果任務執行結束,返回true。
boolean isCancelled():獲取併發任務的取消狀態。如果任務完成前被取消,則返回true。
boolean cancel(boolean mayInterruptRunning):取消併發任務的執行。
1.4.4. FutureTask再次深入
說完了FutureTask的兩個爹,再來到FutureTask自身。
在FutureTask內部,又有哪些成員和方法,具體的執行併發任務、非同步獲取任務結果的呢?
首先,FutureTask內部有一個 Callable型別的成員:
private Callable callable;
這個callable例項屬性,是構造器傳進來的。用來儲存併發執行的 Callable型別的任務。callable例項屬性,是構造器強制性的,必須要在FutureTask例項構造的時候進行初始化。
其次,FutureTask內部有一個run方法。
這個run方法,是Runnable介面在FutureTask內部的實現。在這個run方法其中,會執行到callable成員的call方法。執行完成後,結果如何提供出去呢?這就是到了最後一點。
最後,FutureTask內部有另一個 Object 型別的重要成員——outcome例項屬性:
private Object outcome;
掐指一算,就知道這個outcome屬性,是用來儲存callable成員call方法的執行結果。FutureTask類run方法執行完成callable成員的call方法後,會將結果儲存在outcome例項屬性,供FutureTask類的get例項方法獲取。
好了,重要將這個媒婆介紹完了。
如果還沒有清楚,不要緊,看一個例項就一目瞭然了。
1.4.5. 喝茶例項演進之——獲取非同步結果
回顧一下,前面的join悶葫蘆合併阻塞有一個很大的問題,就是沒有返回值。
如果燒水執行緒的水有問題,或者燒水壺壞了,mian執行緒是沒有辦法知道的。
如果清洗執行緒的茶杯有問題,清洗不來了,mian執行緒是沒有辦法知道的。
為了演示結果,給主類增加兩個成員:
static boolean warterOk = false;
static boolean cupOk =false;
代表燒水成功和清洗成功。初始值都為false。
燒水執行緒、清洗執行緒執行完後,都需要返回結果。 主執行緒獲取後,儲存在上面的兩個主類成員中。
廢話不多說,看程式碼:
package com.crazymakercircle.coccurent;
import com.crazymakercircle.util.Print;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* Created by 尼恩 at 瘋狂創客圈
*/
public class JavaFutureDemo
{
public static final int SLEEP_GAP = 500;
public static String getCurThreadName()
{
return Thread.currentThread().getName();
}
static class HotWarterJob implements Callable<Boolean> //①
{
@Override
public Boolean call() throws Exception //②
{
try
{
Print.tcfo("洗好水壺");
Print.tcfo("灌上涼水");
Print.tcfo("放在火上");
//執行緒睡眠一段時間,代表燒水中
Thread.sleep(SLEEP_GAP);
Print.tcfo("水開了");
} catch (InterruptedException e)
{
Print.tcfo(" 發生異常被中斷.");
return false;
}
Print.tcfo(" 執行結束.");
return true;
}
}
static class WashJob implements Callable<Boolean>
{
@Override
public Boolean call() throws Exception
{
try
{
Print.tcfo("洗茶壺");
Print.tcfo("洗茶杯");
Print.tcfo("拿茶葉");
//執行緒睡眠一段時間,代表清洗中
Thread.sleep(SLEEP_GAP);
Print.tcfo("洗完了");
} catch (InterruptedException e)
{
Print.tcfo(" 清洗工作 發生異常被中斷.");
return false;
}
Print.tcfo(" 清洗工作 執行結束.");
return true;
}
}
static boolean warterOk = false;
static boolean cupOk =false;
public static void drinkTea()
{
if (warterOk && cupOk)
{
Print.tcfo("泡茶喝");
}
else if (!warterOk)
{
Print.tcfo("燒水失敗,沒有茶喝了");
}
else if (!cupOk)
{
Print.tcfo("杯子洗不了,沒有茶喝了");
}
}
public static void main(String args[])
{
Callable<Boolean> hJob = new HotWarterJob();//③
FutureTask<Boolean> hTask =
new FutureTask<Boolean>(hJob);//④
Thread hThread = new Thread(hTask, "** 燒水-Thread");//⑤
Callable<Boolean> wJob = new WashJob();//③
FutureTask<Boolean> wTask =
new FutureTask<Boolean>(wJob);//④
Thread wThread = new Thread(wTask, "$$ 清洗-Thread");//⑤
hThread.start();
wThread.start();
Thread.currentThread().setName("主執行緒");
try
{
warterOk = hTask.get();
cupOk = wTask.get();
// hThread.join();
// wThread.join();
drinkTea();
} catch (InterruptedException e)
{
Print.tcfo(getCurThreadName() + "發生異常被中斷.");
} catch (ExecutionException e)
{
e.printStackTrace();
}
Print.tcfo(getCurThreadName() + " 執行結束.");
}
}
1.4.6. FutureTask使用流程
藉助上面的喝茶例項程式碼,說明一下通過FutureTask獲取非同步結果的流程步驟:
(1)非同步程式碼邏輯需要繼承Callable,通過call方法返回具體的值
static class WashJob implements Callable<Boolean>
{
@Override
public Boolean call() throws Exception
{
//..業務程式碼,並且有返回值
}
(3)從非同步邏輯到非同步執行緒,需要媒婆類FutureTask搭橋
Callable<Boolean> hJob = new HotWarterJob();//非同步邏輯
FutureTask<Boolean> hTask =
new FutureTask<Boolean>(hJob);//媒婆例項
Thread hThread = new Thread(hTask, "** 燒水-Thread");//非同步執行緒
FutureTask和Callable都是泛型類,泛型引數表示返回結果的型別。所以,在使用的時候,倆個型別的泛型引數一定需要一致的。
(3)取得非同步執行緒的執行結果,也需要FutureTask 媒婆例項做下二傳
warterOk = hTask.get();
通過FutureTask 例項的get方法,可以獲取執行緒的執行結果。
三步至此,結果到手。
總結一下,FutureTask 比 join 執行緒合併高明,能取得非同步執行緒的結果。
但是,也就未必高明到哪裡去了。為啥呢?
因為,通過FutureTask的get方法,獲取非同步結果時,主執行緒也會被阻塞的。這一點,FutureTask和join也是一致的,他們倆都是非同步阻塞模式。
非同步阻塞的效率是比較低的,被阻塞的主執行緒,不能幹任何事情,唯一能幹的,就是在傻傻等待。
如果想提高效率,需要用到非阻塞模式。這裡只講回撥模式的非阻塞,其他模式的非阻塞,請關注瘋狂創客圈的後續文章。
原生Java,除了阻塞模式的獲取結果,並沒有實現非阻塞模式的非同步回撥。如果需要用到非同步回撥,得引入一些額外的框架。
1.5. Guava 的非同步回撥
在非常著名的google 提供的擴充套件包 Guava中,提供了一種非同步回撥的解決方案。
為了實現非同步回撥,Guava 對Java的Future 非同步模式進行能力匯入:
(1)匯入了一個新的介面 FutureCallback,代表回撥執行的業務邏輯
(2)對Java併發包中的 Future 介面進行了擴充套件,將回調邏輯作為監聽器繫結到非同步執行緒
1.5.1. 能力匯入 —— FutureCallback
FutureCallback 是一個新增的介面,用來填寫回調邏輯。這個介面,是在實際開發中程式設計使用到的。回撥的程式碼,編寫在它的實現類中。
FutureCallback擁有兩個回撥方法:
(1)onSuccess ,在非同步執行緒執行成功回撥
(2)onFailure,在非同步執行緒丟擲異常時回撥
FutureCallback的原始碼如下:
public interface FutureCallback<V> {
void onSuccess(@Nullable V var1);
void onFailure(Throwable var1);
}
1.5.2. 能力擴充套件 —— ListenableFuture
如果將回調方法,繫結到非同步執行緒去呢?
Guava中,有一個非常關鍵的角色,ListenableFuture。看名稱,就能對應出它與Java 中的原生介面的親戚關係。
如果沒有猜錯,這個介面是 Guava 對java 的Future介面的擴充套件。
來看看 ListenableFuture介面的原始碼,如下:
package com.google.common.util.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
public interface ListenableFuture<V> extends Future<V> {
void addListener(Runnable var1, Executor var2);
}
前面講到,通過Java的Future介面,可以阻塞取得非同步的結果。在這個基礎上,ListenableFuture增加了一個方法 —— addListener 。
這個方法的作用,就是將前一小節的FutureCallback 回撥邏輯,繫結到非同步執行緒上。 可以是,addListener 不直接在實際程式設計中使用。這個方法只在Guava內部使用,如果對它感興趣,可以檢視Guava原始碼。
既然addListener 方法不能直接使用,那麼,在實際程式設計中,如何將 FutureCallback 回撥邏輯繫結到非同步執行緒呢?
不慌,辦法總是有的。
需要用到Guava的Futures 工具類。這個類有一個addCallback 靜態方法,將ListenableFuture 的例項和FutureCallback 的回撥例項,進行繫結。
繫結的示意程式碼如下:
Futures.addCallback( hFuture , new FutureCallback<Boolean>()
{
public void onSuccess(Boolean r)
{
//成功時候的回撥邏輯
}
public void onFailure(Throwable t)
{
//異常時候的回撥邏輯
}
});
1.5.3. ListenableFuture 例項從何而來
從上文已知,原生java的Future介面的例項,一種方法是——直接構建媒婆類FutureTask的例項,就是Future介面的例項。
當然,還有第二種方法,就是通過執行緒池獲取Future介面的例項。具體的做法是向Java執行緒池提交非同步任務,包括Runnable或者Callable例項。
方法如下:
Future<Boolean> hTask = pool.submit(hJob);
Future<Boolean> wTask = pool.submit(wJob);
注意,pool 是一個Java 執行緒池。
如果要獲取Guava的ListenableFuture 例項,主要是通過類似上面的第二種方式——向執行緒池提交任務的非同步任務的方式獲取。不過,用到的執行緒池,是Guava的執行緒池,不是Java的執行緒池。
Guava執行緒池,而是對Java執行緒池的一種裝飾。
兩種執行緒池的建立程式碼,具體如下:
//java 執行緒池
ExecutorService jPool =
Executors.*newFixedThreadPool*(10);
//guava 執行緒池
ListeningExecutorService gPool =
MoreExecutors.*listeningDecorator*(jPool);
有了Guava的執行緒池之後,就可以通過提交任務,來獲取ListenableFuture 例項了。程式碼如下 :
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
關於Gava的執行緒池,請關注【瘋狂創客圈】的執行緒池的部落格文章。
1.5.4. Guava非同步回撥的流程
總結一下,Guava非同步回撥的流程如下:
第一步:建立Java的 Callable的非同步任務例項。例項如下:
Callable<Boolean> hJob = new HotWarterJob();//非同步任務Callable<Boolean> wJob = new WashJob();//非同步任務
非同步任務也可以是Runnable型別。
第二步: 獲取Guava執行緒池
//java 執行緒池
ExecutorService jPool =
Executors.*newFixedThreadPool*(10);
//guava 執行緒池
ListeningExecutorService gPool =
MoreExecutors.*listeningDecorator*(jPool);
第三步: 提交非同步任務到Guava執行緒池,獲取ListenableFuture 例項
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
第四步:建立回撥的 FutureCallback 例項,通過Futures.addCallback,將回調邏輯繫結到ListenableFuture 例項。
Futures.*addCallback*( hFuture , new FutureCallback<Boolean>()
{
public void onSuccess(Boolean r)
{
//成功時候的回撥邏輯
}
public void onFailure(Throwable t)
{
//異常時候的回撥邏輯
}
});
完成以上四步,當非同步邏輯執行完成後,就會回撥FutureCallback 例項中的回撥程式碼。
1.5.5. 喝茶例項 —— 非同步回撥演進
已經對喝茶例項的程式碼非常熟悉下,下面是Guava的非同步回撥的演進版本,程式碼如下:
package com.crazymakercircle.coccurent;
import com.crazymakercircle.util.Print;
import com.google.common.util.concurrent.*;
import java.util.concurrent.*;
/**
* Created by 尼恩 at 瘋狂創客圈
*/
public class GuavaFutureDemo
{
public static final int SLEEP_GAP = 500;
public static String getCurThreadName()
{
return Thread.currentThread().getName();
}
static class HotWarterJob implements Callable<Boolean> //①
{
@Override
public Boolean call() throws Exception //②
{
try
{
Print.tcfo("洗好水壺");
Print.tcfo("灌上涼水");
Print.tcfo("放在火上");
//執行緒睡眠一段時間,代表燒水中
Thread.sleep(SLEEP_GAP);
Print.tcfo("水開了");
} catch (InterruptedException e)
{
Print.tcfo(" 發生異常被中斷.");
return false;
}
Print.tcfo(" 執行結束.");
return true;
}
}
static class WashJob implements Callable<Boolean>
{
@Override
public Boolean call() throws Exception
{
try
{
Print.tcfo("洗茶壺");
Print.tcfo("洗茶杯");
Print.tcfo("拿茶葉");
//執行緒睡眠一段時間,代表清洗中
Thread.sleep(SLEEP_GAP);
Print.tcfo("洗完了");
} catch (InterruptedException e)
{
Print.tcfo(" 清洗工作 發生異常被中斷.");
return false;
}
Print.tcfo(" 清洗工作 執行結束.");
return true;
}
}
static boolean warterOk = false;
static boolean cupOk = false;
public synchronized static void drinkTea()
{
if (warterOk && cupOk)
{
Print.tcfo("泡茶喝");
}
else if (!warterOk)
{
Print.tcfo("燒水失敗,沒有茶喝了");
}
else if (!cupOk)
{
Print.tcfo("杯子洗不了,沒有茶喝了");
}
}
public static void main(String args[])
{
Thread.currentThread().setName("主執行緒");
Callable<Boolean> hJob = new HotWarterJob();//③
Callable<Boolean> wJob = new WashJob();//③
//java 執行緒池
ExecutorService jPool =
Executors.newFixedThreadPool(10);
//guava 執行緒池
ListeningExecutorService gPool =
MoreExecutors.listeningDecorator(jPool);
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
Futures.addCallback(hFuture, new FutureCallback<Boolean>()
{
public void onSuccess(Boolean r)
{
if (r)
{
warterOk = true;
drinkTea();
}
else
{
Print.tcfo("燒水失敗,沒有茶喝了");
}
}
public void onFailure(Throwable t)
{
Print.tcfo("燒水失敗,沒有茶喝了");
}
});
ListenableFuture<Boolean> wFuture = gPool.submit(wJob);
Futures.addCallback(wFuture, new FutureCallback<Boolean>()
{
public void onSuccess(Boolean r)
{
if (r)
{
cupOk = true;
drinkTea();
}
else
{
Print.tcfo("清洗失敗,沒有茶喝了");
}
}
public void onFailure(Throwable t)
{
Print.tcfo("杯子洗不了,沒有茶喝了");
}
});
try
{
Print.tcfo("讀書中......");
Thread.sleep(100000);
} catch (InterruptedException e)
{
Print.tcfo(getCurThreadName() + "發生異常被中斷.");
}
Print.tcfo(getCurThreadName() + " 執行結束.");
gPool.shutdown();
}
}
寫在最後
為什麼說非同步回撥是如此的重要呢 ? 因為高併發程式設計,到處都用到Future模式和Callback模式。
下一篇:Netty 中的Future 回撥實現與執行緒池詳解。這個也是一個非常重要的基礎篇。
瘋狂創客圈 Java 死磕系列
- Java (Netty) 聊天程式【 億級流量】實戰 開源專案實戰
- Netty 原始碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡
- 瘋狂創客圈 【 部落格園 總入口 】