1. 程式人生 > >【小家java】Java8新特性之---CompletableFuture的系統講解和例項演示(使用CompletableFuture構建非同步應用)

【小家java】Java8新特性之---CompletableFuture的系統講解和例項演示(使用CompletableFuture構建非同步應用)

相關閱讀

【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本


非同步

傳統單執行緒環境下,呼叫函式是同步的,必須等待程式返回結果後,才可進行其他處理。因此為了提高系統整體的併發效能,引入了非同步執行~

jdk中已經內建future模式的實現。Future是Java5新增的類,用來描述一個非同步計算的結果。可以用isDone方法來檢查計算是否完成,或者使用get阻塞住呼叫執行緒,直至計算完成返回結果,也可以用cancel方法來停止任務的執行。

Futrue非同步模式存在的問題

Future以及相關使用方法提供了非同步執行任務的能力,但對於結果的獲取卻是不方便,只能通過阻塞或輪詢的方式得到任務結果。

阻塞的方式與我們理解的非同步程式設計其實是相違背的,而輪詢又會耗無謂的CPU資源。而且還不能及時得到計算結果,為什麼不能用觀察者設計模式當計算結果完成及時通知監聽者呢?

很多語言像Node.js,採用回撥的方式實現非同步程式設計。Java的一些框架像Netty,自己擴充套件Java的Future介面,提供了addListener等多個擴充套件方法。

guava裡面也提供了通用的擴充套件Future: ListenableFuture\SettableFuture以及輔助類Futures等,方便非同步程式設計

作為正統Java類庫,是不是應該加點什麼特性,可以加強一下自身庫的功能?

JDK8引入中重磅類庫:CompletableFuture

Java8裡面新增加了一個包含50個方法左右的類:CompletableFuture. 提供了非常強大的Future的擴充套件功能,可以幫助簡化非同步程式設計的複雜性,提供了函數語言程式設計能力,可以通過回撥的方式計算處理結果,並且提供了轉換和組織CompletableFuture的方法。

JDK1.8才新加入的一個實現類CompletableFuture,實現了Future, CompletionStage兩個介面。

CompletableFuture實現了CompletionStage介面的如下策略:
  1. 為了完成當前的CompletableFuture介面或者其他完成方法的回撥函式的執行緒,提供了非非同步的完成操作。
  2. 沒有顯式入參Executor的所有async方法都使用ForkJoinPool.commonPool()為了簡化監視、除錯和跟蹤,所有生成的非同步任務都是標記介面AsynchronousCompletionTask的例項。
  3. 所有的CompletionStage方法都是獨立於其他共有方法實現的,因此一個方法的行為不會受到子類中其他方法的覆蓋
CompletableFuture實現了Futurre介面的如下策略:
  1. CompletableFuture無法直接控制完成,所以cancel操作被視為是另一種異常完成形式。方法isCompletedExceptionally可以用來確定一個CompletableFuture是否以任何異常的方式完成。
  2. 以一個CompletionException為例,方法get()和get(long,TimeUnit)丟擲一個ExecutionException,對應CompletionException。為了在大多數上下文中簡化用法,這個類還定義了方法join()和getNow,而不是直接在這些情況中直接丟擲CompletionException。

CompletableFuture中4個非同步執行任務靜態方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

其中supplyAsync用於有返回值的任務,runAsync則用於沒有返回值的任務。Executor引數可以手動指定執行緒池,否則預設ForkJoinPool.commonPool()系統級公共執行緒池,
注意:這些執行緒都是Daemon執行緒,主執行緒結束Daemon執行緒不結束,只有JVM關閉時,生命週期終止

主動完成計算

CompletableFuture 類實現了CompletionStage和Future介面,所以還是可以像以前一樣通過阻塞或輪詢的方式獲得結果。儘管這種方式不推薦使用。

如下四個方法都可以獲取結果:

public T 	get()  //Futrue的方法 阻塞
public T 	get(long timeout, TimeUnit unit) //Futrue的方法 阻塞
// 新提供的方法
public T 	getNow(T valueIfAbsent) //getNow有點特殊,如果結果已經計算完則返回結果或拋異常,否則返回給定的valueIfAbsent的值(此方法有點反人類有木有)
public T 	join() // 返回計算的結果或丟擲一個uncheckd異常。 推薦使用

上面4個方法,推薦使用join,還有帶超時時間的get方法

CompletableFuture並非一定要交給執行緒池執行才能實現非同步,你可以像下面這樣實現非同步執行:

     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        
        //自己開個執行緒去執行 執行完把結果告訴completableFuture即可
        new Thread(() -> {
            // 模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告訴completableFuture任務已經完成 並且把結果告訴completableFuture
            completableFuture.complete("ok"); //這裡把你信任的結果set進去後,所有阻塞的get()方法都能立馬蘇醒,獲得到結果
        }).start();
        // 獲取任務結果,如果沒有完成會一直阻塞等待
        System.out.println("準備列印結果...");
        String result = completableFuture.get();
        System.out.println("計算結果:" + result);
    }
輸出:
準備列印結果...
task doing...
計算結果:ok

如果沒有意外,上面發的程式碼工作得很正常。但是,如果任務執行過程中產生了異常會怎樣呢?如下:只加一句1/0的程式碼

 //自己開個執行緒去執行 執行完把結果告訴completableFuture即可
        new Thread(() -> {
            // 模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告訴completableFuture任務已經完成 並且把結果告訴completableFuture
            completableFuture.complete("ok");
        }).start();

這種情況下會得到一個相當糟糕的結果:異常會被限制在執行任務的執行緒的範圍內,最終會殺死該守護執行緒,而主執行緒,將永遠永遠阻塞了。

怎麼解決呢?

  • 使用get(long timeout, TimeUnit unit)代替get()方法,它使用一個超時引數來避免發生這樣的情況。這是一種值得推薦的做法,我們應該儘量在你的程式碼中新增超時判斷的邏輯,避免發生類似的問題。

使用這種方法至少能防止程式永久地等待下去,超時發生時,程式會得到通知發生了TimeoutException 。不過,也因為如此,你不能確定執行任務的執行緒內到底發生了什麼問題(因此自己要做好權衡)。

  • 更好的解決方案是:為了能獲取任務執行緒內發生的異常,你需要使用
    CompletableFuture的completeExceptionally方法將導致CompletableFuture內發生問題的異常丟擲。這樣,當執行任務發生異常時,呼叫get()方法的執行緒將會收到一個 ExecutionException異常,該異常接收了一個包含失敗原因的Exception 引數。
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();

        //自己開個執行緒去執行 執行完把結果告訴completableFuture即可
        new Thread(() -> {
            // 模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            //} catch (InterruptedException e) {
            } catch (Exception e) {
                // 告訴completableFuture任務發生異常了
                completableFuture.completeExceptionally(e);
                e.printStackTrace();
            }
            // 告訴completableFuture任務已經完成 並且把結果告訴completableFuture
            completableFuture.complete("ok");
        }).start();
        // 獲取任務結果,如果沒有完成會一直阻塞等待
        System.out.println("準備列印結果...");
        String result = completableFuture.get();
        System.out.println("計算結果:" + result);
    }

這樣子,如果內部發生了異常,呼叫get方法的時候就能得到這個Exception,進而能拿到拋異常的原因了。

使用案例

在Java8中,CompletableFuture提供了非常強大的Future的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,並且提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法。

它可能代表一個明確完成的Future,也有可能代表一個完成階段( CompletionStage ),它支援在計算完成以後觸發一些函式或執行某些動作。

建立CompletableFuture

四個靜態方法(如上),一個空建構函式

whenComplete計算結果完成時的處理

當CompletableFuture的計算結果完成,或者丟擲異常的時候,我們可以執行特定的Action。主要是下面的方法:

public CompletableFuture<T> 	whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的型別是BiConsumer<? super T,? super Throwable>,它可以處理正常的計算結果,或者異常情況。
方法不以Async結尾,意味著Action使用相同的執行緒執行,而Async可能會使用其它的執行緒去執行(如果使用相同的執行緒池,也可能會被同一個執行緒選中執行)。

注意這幾個方法都會返回CompletableFuture。

CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null

        //若有異常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
handle、 thenApply相當於回撥函式(callback) 當然也有轉換的作用
public <U> CompletableFuture<U> 	handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> 	handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> 	handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)


    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

使用方式如下:

    public static void main(String[] args) {

        CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null

        //若有異常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

        //上面效果 或者下面這麼寫也行(但上面那麼寫 連同異常都可以處理) 全部匿名方式 效率高 程式碼也優雅
        //CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> 100)
        //       .thenApplyAsync(i -> i * 10)
        //        .thenApply(i -> i.toString());
        //System.out.println(f.get()); //"1000"
    }

我們會發現,結合Java8的流式處理,簡直絕配。程式碼看起來特別的優雅,關鍵還效率高,連異常都一下子給我們抓住了,簡直完美。

thenApply與handle方法的區別在於handle方法會處理正常計算值和異常,因此它可以遮蔽異常,避免異常繼續丟擲。而thenApply方法只是用來處理正常值,因此一旦有異常就會丟擲。

thenAccept與thenRun(純消費(執行Action))
    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }
  • 可以看到,thenAccept和thenRun都是無返回值的。如果說thenApply是不停的輸入輸出的進行生產,那麼thenAccept和thenRun就是在進行消耗。它們是整個計算的最後兩個階段。
  • 同樣是執行指定的動作,同樣是消耗,二者也有區別:
    thenAccept接收上一階段的輸出作為本階段的輸入
    thenRun根本不關心前一階段的輸出,根本不不關心前一階段的計算結果,因為它不需要輸入引數(thenRun使用的是Runnable,若你只是單純的消費,不需要啟用執行緒時,就用thenAccept更合適

上面的方法是當計算完成的時候,會生成新的計算結果(thenApply, handle),或者返回同樣的計算結果whenComplete。CompletableFuture還提供了一種處理結果的方法,只對結果執行Action,而不返回新的計算值,因此計算值為Void:

public static void main(String[] args) {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> 100)
                .thenAccept(x -> System.out.println(x)); //100
        //如果此句話get不呼叫  也是能夠輸出100的 上面也會有輸出的
        System.out.println(f.join()); //null 返回null,所以thenAccept是木有返回值的

        //thenRun的案例演示
        CompletableFuture<Void> f2 = CompletableFuture.supplyAsync(() -> 100)
                .thenRun(() -> System.out.println("不需要入參")); //不需要入參
        System.out.println(f2.join()); //null 返回null,所以thenAccept是木有返回值的

    }

thenAcceptBoth以及相關方法提供了類似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action,它用來組合另外一個非同步的結果。
runAfterBoth是當兩個CompletionStage都正常完成計算的時候,執行一個Runnable,這個Runnable並不使用計算的結果。

public <U> CompletableFuture<Void> 	thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void> 	runAfterBoth(CompletionStage<?> other,  Runnable action)

例子:

    public static void main(String[] args) {
        CompletableFuture<Void> f =  CompletableFuture.supplyAsync(() -> 100)
                // 第二個消費者:x,y顯然是可以把前面幾個的結果都拿到,然後再做處理
                .thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y)); //1000
        System.out.println(f.join()); //null
    }
thenCombine、thenCompose整合兩個計算結果
public <U,V> CompletableFuture<V> 	thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> 	thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V