手把手教你使用 CompletableFuture
背景
在jdk5中,我們通過使用Future和Callable,可以在任務執行完畢後得到任務執行結果。可以使用isDone檢測計算是否完成,使用cancle停止執行任務,使用阻塞方法get阻塞住呼叫執行緒來獲取返回結果,使用阻塞方式獲取執行結果,有違非同步程式設計的初衷,而且Future的異常只能自己內部處理。
jdk8中加入了實現類CompletableFuture<T>,用於非同步程式設計。底層做任務使用的是ForkJoin, 顧名思義,是將任務的資料集分為多個子資料集,而每個子集,都可以由獨立的子任務來處理,最後將每個子任務的結果彙集起來。它是ExecutorService介面的一個實現,它把子任務分配給執行緒池(稱為ForkJoinPool)中的工作執行緒。 從api文件看,它實現了2個介面 CompletionStage<T>, Future<T>,CompletableFuture<T>擁有Future的所有特性。 CompletionStage支援lambda表示式,介面的方法的功能都是在某個階段得到結果後要做的事情。 CompletableFuture內建lambda表示式,支援非同步回撥,結果轉換等功能,它有以下Future實現不了的功能
-
合併兩個相互獨立的非同步計算的結果。
-
等待非同步任務的所有任務都完成。
-
等待非同步任務的其中一個任務完成就返回結果。
-
任務完成後呼叫回撥方法
-
任務完成的結果可以用於下一個任務。
-
任務完成時發出通知
-
提供原生的異常處理api
CompletableFuture的使用方法
首先說下獲取結果方式 CompletableFuture獲取結果的方式有如下4個方法:
1:get 阻塞獲取結果,實現Future的get介面,顯式丟擲異常
2:getNow(T valueIfAbsent) 獲取執行結果,如果當前任務未執行完成,則返回valueIfAbsent
3: join 執行完成後返回執行結果,或者丟擲unchecked異常
4: T get(long timeout, TimeUnit unit) 在有限時間內獲取資料
以下是CompletableFuture的建立物件以及api的使用
1: 建立CompletableFuture 物件
public static <U> CompletableFuture<U> completedFuture(U value)
靜態方法,返回一個已經計算好的CompletableFuture 比如
@Testpublic void testStatic() { CompletableFuture<String> completableFuture =CompletableFuture.completedFuture("test");//判斷cf是否 執行完畢 assertTrue(completableFuture.isDone());//getNow獲取結果,如果獲取不到,返回預設值null assertEquals("test", completableFuture.getNow(null)); }
completableFuture 還能主動結束運算,並顯示處理異常,如下是非同步執行的程式碼
@Test public void testActive() { CompletableFuture<String> completableFuture = new CompletableFuture();new Thread(() -> {try { String string = null; string.length(); Thread.currentThread().sleep(2000); // 通知完成計算 ,並將結果complete返回 completableFuture.complete("complete"); } catch (Exception e) { // 處理異常 在獲取結果地方可以捕獲到異常 completableFuture.completeExceptionally(e); } }).start(); try { // 同步等待返回結果如果thread內部未發生異常並執行了complete方法,將得到字串“complete”的結果 System.out.println(completableFuture.join()); } catch (Exception e) { //捕獲執行緒內部的異常捕獲空指標異常 System.out.println("發生異常了" + e.getMessage()); } }
2: 使用工廠方式建立cf物件
CompletableFuture主要有以下四個工廠方式建立物件的靜態方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
Supplier是java8函數語言程式設計的一個介面,是一個生產者,可以不接收引數。只有一個get方法返回一個泛型例項。 很明顯,Async結尾的都是可以非同步執行,runAsync 接收一個Runnable函式式介面型別引數,不返回結算結果。supplyAsync接收一個函式式介面型別Supplier ,可以返回計算結果。以上方法如果不指定執行任務的執行緒池Executor ,則預設使用ForkJoinPool.commonPoolcommonPool執行任務。這些介面都支援lambda實現非同步的操作。 以下是SupplyAsync非同步執行的簡單示例
@Testpublic void testSupplyAsync() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {//執行耗時任務 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }return "glz"; });//獲取結果 System.out.println(cf.join()); }
3: CompletableFuture 的非同步回撥功能
上面的方法,執行任務是非同步操作。但是呼叫執行緒還在等待結果。我們還可以給cf添加回調方法,在任務執行完成後使用cf的結果再做下一步操作,轉換。所以 執行以下方法時,cf已經計算完畢。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
從引數型別可以看到,這是接收一個cf計算的結果T,經過處理後返回引數型別為U的cf。 其中第一個方法是在cf完成的執行緒中呼叫。而帶Async將在與呼叫者cf不同的執行緒中非同步呼叫。
@Test public void testThenApply() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {// 執行耗時任務 try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return "123"; }); // 這裡cf的計算結果傳個thenApply作為引數,執行字串轉int的方法,並返回一個cf物件cf1 CompletableFuture<Integer> cf1 = cf.thenApply(Integer::parseInt); // cf1的計算結果作為引數x傳給thenApply,返回一個心得cf物件 cf2.CompletableFuture<Double> cf2 = cf1.thenApply(x -> x * 0.01); // 獲取最終結果 System.out.println(cf2.join()); //如果回撥函式比較耗時,可以使用非同步的方法thenApplyAsync}
4: 執行完成時的程式碼,即對結果進行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
入參是Consumer ,執行Consumer 後沒有返回結果,所以稱為消耗。
@Test public void thenAccept(){ CompletableFuture.supplyAsync(() -> "gong").thenAccept(x -> System.out.println(s+" lz")); }
結果是 gong lz
5:上一步結果與下一步操作無關係
在執行cf後,如果得到的結果對下一步沒有影響,也就是說下一步的操作並不關心上一步的結果,最終也不返回值,可以使用thenRun 引數傳遞一個Runnable.
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); @Test public void thenRun() { CompletableFuture.supplyAsync(() -> "hello").thenRun(() -> System.out.println("hello world")); }
6: 對2個cf的結果進行組合thenCompose
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
thenCompose方法,可以將2個獨立的任務進行流水線操作 。將當前cf的計算結果作為引數傳遞給後面的cf
@Test public void testCompose() { CompletableFuture<String> cf = CompletableFuture.completedFuture("hello") .thenCompose(result -> CompletableFuture.supplyAsync(() -> { System.out.println(result); return "result"; })); System.out.println(cf.join()); }
7: 結合2個cf的結果 thenCombine
可以將2個完全不相干的物件的結果整合起來,2項任務可以同時執行,比如一個對外的介面服務,既查詢資料庫中要查詢資料的總量,也要返回具體某一頁的資料,可以一個cf負責執行查詢總條數count的sql,一個查詢一頁資料。BiFunction是合併結果資料的函式
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
其中T是呼叫thenCombine的cf的結果資料,U是other的結果,v就是合併的結果型別。
@Test public void testCombine() { CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(3000); System.out.println("cf1 is doning"); } catch (InterruptedException e) { e.printStackTrace(); } // 微軟雅黑, SimSun, sans-serif;">返回結果 return "hello"; }); CompletableFuture<String> result = cf1.thenCombine(CompletableFuture.supplyAsync(() -> {try { Thread.sleep(500); System.out.println("cf2 is doning"); } catch (InterruptedException e) { e.printStackTrace(); }
ystem.out.println(result.join());
return "world"; }), (x, y) -> x + y);//合併2個操作結果
S
}
8:消耗兩個cf的結果,不返回結果
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block) CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
對於2個cf,我們只想在他們執行完成時,消耗執行結果,但是不做資料返回,,我們只是希望當完成時得到通知. 此方法與thenCombine相似,只不過返回 CompletableFuture<Void> ,只做消耗處理
@Test public void testAcceptBoth(){ CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100"); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i))); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
9:取計算速度最快的結果
針對兩個CompletionStage,將計算最快的那個CompletionStage的結果用來作為下一步的消耗。 此方法接受Consumer只對結果進行消耗.
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
@Test public void acceptEither() { CompletableFuture.supplyAsync(() -> { try {// 如果不加sleep,可能列印hello Thread.currentThread().sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); }return "hello"; }).acceptEither(CompletableFuture.supplyAsync(() -> "world"), result -> { System.out.println(result); }); }
10: 計算最快的cf的結果轉換
針對兩個CompletionStage,將計算的快的那個CompletionStage的結果用來作為下一步的轉換操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
fn是對 呼叫applyToEither的呼叫者和 other 2個計算最快的那個結果進行處理,傳入t型別資料,返回一個CompletionStage
@Test public void applyToEither() { double result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }return "0.001"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }return "0.002"; }), s -> Double.valueOf(s)).join(); System.out.println(result); } //由於返回0.002的cf睡眠時間比較短,先執行完畢,優先返回結果,所以2個cf最先返回0.002.最終result就是0.002
11:2個cf都執行完後執行操作
2個cf都執行完後,執行操作Runnable,Runnable不關心2個cf的執行結果
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); @Test public void runAfterBoth() { CompletableFuture.supplyAsync(() -> "m").runAfterBothAsync(CompletableFuture.supplyAsync(() ->"n"), () -> System.out.println("hello world")); }
12:處理cf陣列
以上介紹的都是2個future的組合使用。cf還提供allOf,引數是cf陣列,當陣列中所有的cf都執行完成時,返回一個CompletableFuture<Void>。呼叫返回的cf的join方法阻塞等待cf陣列中所有cf執行完成。 anyOf是當cf陣列中任意一個cf執行完成後,就返回一個cf。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
讀者可自行編寫示例程式碼
13: 異常處理
在上面手工建立cf物件中,介紹過異常的處理,同樣使用工廠建立的cf也具有異常管理機制,讀者可自行舉一反三。
小結
本文簡單介紹了cf的使用方法,讀者可參閱java8實戰這本書,更深入學習CompletableFuture的應用場景。
參考: java8實戰