1. 程式人生 > >Java 8新特性之CompletableFuture:組合式非同步程式設計

Java 8新特性之CompletableFuture:組合式非同步程式設計

隨著多核處理器的出現,提升應用程式的處理速度最有效的方式就是可以編寫出發揮多核能力的軟體,我們已經可以通過切分大型的任務,讓每個子任務並行執行,使用執行緒的方式,分支/合併框架(Java 7) 和並行流(Java 8)來實現。

現在很多大型的網際網路公司都對外提供了API服務,比如百度的地圖,微博的新聞,天氣預報等等。很少有網站或網路應用匯以完全隔離的方式工作,而是採用混聚的方式:它會使用來自多個源的內容,將這些內容聚合在一起,方便使用者使用。

比如實現一個功能,你需要在微博中搜索某個新聞,然後根據當前座標獲取天氣預報。這些呼叫第三方資訊的時候,不想因為等待搜尋新聞時,放棄對獲取天氣預報的處理,於是我們可以使用 分支/合併框架 及並行流 來並行處理,將他們切分為多個子操作,在多個不同的核、CPU甚至是機器上並行的執行這些子操作。

相反,如果你想實現併發,而不是並行,或者你的主要目標是在同一個CPU上執行幾個鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程式的吞吐量,那麼你其實真正想做的是避免因為等待遠端服務的返回,或者對資料庫的查詢,而阻塞執行緒的執行,浪費寶貴的計算資源,因為這種等待時間可能會很長。Future介面,尤其是它的新版實現CompletableFuture是處理這種情況的利器。

Future介面

  Future介面在java 5中被引入,設計初衷是對將來某個時刻會發生的結果進行建模。它建模了一種非同步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給呼叫方。在Future中觸發那些可能會耗時的操作把呼叫執行緒解放出來,讓它能繼續執行其他工作,不用一直等待耗時的操作完成,比如:你拿了一袋子衣服到洗衣店去洗衣服,洗衣店會給你張發票,告訴你什麼時候會洗好,然後你就可以去做其他的事了。Future的另一個優點是它比更底層的Thread更容易使用。使用Future只需要講耗時的操作封裝在一個Callable物件中,再將它提交給ExecutorService就可以了。 Java 8之前使用Future的例子:

    public static void main(String[] args) {
        //建立Executor-Service,通過他可以向執行緒池提交任務
        ExecutorService executor = Executors.newCachedThreadPool();
        //向executor-Service提交 Callable物件
        Future<Double> future = executor.submit(new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                //非同步的方式執行耗時的操作
                return doSomeLongComputation();
            }
        });
        //非同步時,做其他的事情
        doSomethingElse();

        try{
            //獲取非同步操作的結果,如果被阻塞,無法得到結果,那麼最多等待1秒鐘之後退出
            Double result = future.get(1, TimeUnit.SECONDS);
            System.out.print(result);
        } catch (InterruptedException e) {
            System.out.print("計算丟擲一個異常");
        } catch (ExecutionException e) {
            System.out.print("當前執行緒在等待過程中被中斷");
        } catch (TimeoutException e) {
            System.out.print("future物件完成之前已過期");
        }

    }

    public static Double doSomeLongComputation() throws InterruptedException {
        Thread.sleep(1000);
        return 3 + 4.5;
    }

    public static void doSomethingElse(){
        System.out.print("else");
    }

 

這種方式可以再ExecutorService以併發的方式呼叫另外一個執行緒執行耗時的操作的同時,去執行一些其他任務。接著到已經沒有任務執行時,呼叫它的get方法來獲取操作的結果,如果操作完成,就會返回結果,否則會阻塞你的執行緒,一直到操作完成,返回響應的結果。

 

CompletableFuture

  在java 8 中引入了CompletableFuture類,它實現了Future介面,使用了Lambda表示式以及流水線的思想,通過下面這個例子進行學習,比如:我們要做一個商品查詢,根據折扣來獲取價格。

 

public class Shop {
    public double getPrice(String product) throws InterruptedException {
        //查詢商品的資料庫,或連結其他外部服務獲取折扣
        Thread.sleep(1000);
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

 

當呼叫這個方法時,它會阻塞程序,等待事件完成。

將同步方法轉換成非同步方法

 

    public Future<Double> getPriceAsync(String product){
        //建立CompletableFuture物件
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();

        new Thread (()->{
            try {
                //在另一個執行緒中執行計算
                double price = getPrice(product);
                //需要長時間計算的任務結束並得出結果時,設定future的返回值
                futurePrice.complete(price);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        return futurePrice;
    }

 

然後可以這樣呼叫:

        System.out.println("begin");
        Future<Double> futurePrice = shop.getPriceAsync("ss");
        System.out.println("doSomething");
        System.out.println(futurePrice.get());
        System.out.println("end");

begin
doSomething
171.47509091822835
end

這個例子中,首先會呼叫介面 立即返回一個Future物件,在這種方式下,在查詢價格的同時,還可以處理其他任務。最後所有的工作都已經完成,然後再呼叫future的get方法。獲得Future中封裝的值,要麼發生阻塞,直到該任務非同步任務完成,期望的值能夠返回。

 

錯誤處理

如果沒有意外,這個程式碼工作的會非常正常。但是如果計算價格的過程中發生了錯誤,那麼get會永久的被阻塞。這時可以使用過載的get方法,讓它超過一個時間後就強制返回。應該儘量在程式碼中使用這種方式來防止程式永久的等待下去。超時會引發TimeoutException。但是這樣會導致你無法知道具體什麼原因導致Future無法返回,這時需要使用CompletableFUture的completeExceptionally方法將導致CompletableFuture內發生的問題丟擲。

 

public Future<Double> getPriceAsync(String product){
        //建立CompletableFuture物件
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();

        new Thread (()->{
            try {
                double price = getPrice(product);
                futurePrice.complete(price);
            } catch (Exception ex) {
                //丟擲異常
                futurePrice.completeExceptionally(ex);
            }
        }).start();
        return futurePrice;
    }

 

呼叫時:

 

        System.out.println("begin");
        Future<Double> futurePrice = shop.getPriceAsync("ss");
        System.out.println("doSomething");
        try {
            System.out.println(futurePrice.get(1, TimeUnit.SECONDS));
        } catch (TimeoutException e) {
            System.out.print(e);
        }
        System.out.println("end");

 

設定超時時間,然後會將錯誤資訊打印出來。

 

工廠方法supplyAsync建立CompletableFuture

使用工廠方法可以一句話來建立getPriceAsync方法

    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> getPrice(product));
    }

supplyAsync方法接受一個生產者(Supplier)作為引數,返回一個CompletableFuture物件,該物件完成非同步執行後悔讀取呼叫生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行執行緒(Executor)執行,也可以呼叫supplyAsync方法的過載版本,傳入第二個引數指定不同的執行緒執行生產者方法。 工廠方法返回的CompletableFuture物件也提供了同樣的錯誤處理機制。

 

阻塞優化

例如現在有一個商品列表,然後輸出一個字串 商品名,價格 。

 

        List<Shop> shops = Arrays.asList(
                new Shop("one"),
                new Shop("two"),
                new Shop("three"),
                new Shop("four"));


        long start = System.nanoTime();
        List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
        System.out.print(str);
        long end = System.nanoTime();
        System.out.print((end - start) / 1000000);

 

[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]
4110

 

每次呼叫getPrice方法都會阻塞1秒鐘,對付這種我們可以使用並行流來進行優化:

List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());

1137

 

明顯速度提升了,現在對四個商品查詢 實現了並行,所以只耗時1秒多點,下面我們嘗試CompletableFuture:

List<CompletableFuture<String>> str2 = shops.stream().map(shop->
                        CompletableFuture.supplyAsync(
                                ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());

我們使用工廠方法supplyAsync建立CompletableFuture物件,使用這種方式我們會得到一個List<CompletableFuture<String>>,列表中的每一個ComplatableFuture物件在計算完成後都會包含商品的名稱。但是我們要求返回的是List<String>,所以需要等待所有的future執行完畢,再將裡面的值提取出來,填充到列表中才能返回。

List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());

為了返回List<String> 需要對str2新增第二個map操作,對List中的所有future物件執行join操作,一個接一個的等待他們的執行結束。CompletableFuture類中的join和Future介面中的get方法有相同的含義,並且宣告在Future介面中,唯一的不同是join不會丟擲任何檢測到的異常。

1149

現在使用了兩個不同的Stream流水線,而不是在同一個處理流的流水線上一個接一個的防治兩個map操作。考慮流操作之間的延遲特性,如果你在單一流水線中處理流,發向不同商家的請求只能以同步、順序執行的方式才會成功。因此每個建立CompletableFuture物件只能在前一個操作結束之後,再join返回計算結果。

 

更好的解決方式

並行流的版本工作的非常好,那是因為他可以並行處理8個任務,獲取作業系統執行緒數量:

System.out.print(Runtime.getRuntime().availableProcessors());

但是如果列表是9個呢?那麼執行結果就會2秒。因為他最多隻能讓8個執行緒處於繁忙狀態。 但是使用CompletableFuture允許你對執行器Executor進行配置,尤其是執行緒池的大小,這是並行流API無法實現的。

 

定製執行器

 

//建立一個執行緒池,執行緒池的數目為100何商店數目二者中較小的一個值
        final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true); //使用守護執行緒 ---這種方式不會阻止程式的關停
                        return t;
                    }
                });

 

這個執行緒池是一個由守護執行緒構成的執行緒池,Java程式無法終止或退出正在執行中的執行緒,所以最後剩下的那個執行緒會由於一直等待無法發生的事件而引發問題。與此相反,如果將執行緒標記為守護程序,意味著程式退出時它也會被回收。這二者之間沒有效能上的差異。現在可以將執行器作為第二個引數傳遞給supplyAsync方法了。

CompletableFuture.supplyAsync(
                                ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))
                                ,executor)

這時,執行9個商品時,執行速度只有1秒。 執行18個商品時也是1秒。這種狀態會一直持續,直到商店的數目達到我們之前計算的閥值。 處理需要大量使用非同步操作的情況時,這幾乎是最有效的策略。

 

對多個非同步任務進行流水線操作

我們在商品中增加一個列舉Discount.Code 來代表每個商品對應不同的折扣率,建立列舉如下:

 

public class Discount {
    public enum Code{
        NONE(0),
        SILVER(5),
        GOLD(10),
        PLATINUM(15),
        DIAMOND(20);

        private final int value;

        Code(int value){
            this.value = value;
        }
    }
}

 

現在我們修改 getPrice方法的返回格式為:ShopName:price:DiscountCode 使用  : 進行分割的返回值。

 

    public String getPrice(String product){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double price = new Random().nextDouble() * product.charAt(0) + product.charAt(1);
        Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s",name,price,code);
    }

 

返回值: one:120.10:GOLDD

將返回結果封裝到 Quote 類中:

 

public class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

 

parse方法 通過getPrice的方法 返回的字串 會返回Quote物件,此外 Discount服務還提供了一個applyDiscount方法,它接收一個Quote物件,返回一個字串,表示該Quote的shop中的折扣價格:

 

public class Discount {
    public enum Code{..
    }

    public static String applyDiscount(Quote quote){
        return quote.getShopName() + "price :" + Discount.apply(quote.getPrice() ,quote.getDiscountCode());
    }
    public static double apply(double price,Code code){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return price * (100 - code.value) / 100;
    }
}

 

Discount中 也模擬了遠端操作 睡了1秒鐘,首先我們嘗試最直接的方式:

 

        List<String> str = shops.stream()
                .map(shop->shop.getPrice("hhhhh")) //獲取 one:120.10:GOLDD 格式字串
                .map(Quote::parse) //轉換為 Quote 物件
                .map(Discount::applyDiscount) //返回 Quote的shop中的折扣價格
                .collect(toList());
                System.out.print(str);

 

8146

首先,我們呼叫getPrice遠端方法將shop物件轉換成了一個字串。每個1秒

然後,我們將字串轉換為Quote物件。

最後,我們將Quote物件 呼叫 遠端 Discount服務獲取折扣,返回折扣價格。每個1秒

順序執行4個商品是4秒,然後又呼叫了Discount服務又4秒 所以是8秒。 雖然我們現在把流轉換為並行流 效能會很好 但是數量大於8時也很慢。相反,使用自定義CompletableFuture執行器能夠更充分的利用CPU資源。

 

        List<CompletableFuture<String>> priceFutures = shops.stream()
                //非同步獲取每個shop中的價格
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getPrice("hhhhh", executor)
                ))
                //Quote物件存在時,對其返回值進行轉換
                .map(future -> future.thenApply(Quote::parse))
                //使用另一個非同步任務構造期望的future,申請折扣
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote), executor)
                ))
                .collect(toList());
        //等待流中的所有Future執行完畢,提取各自的返回值
        List<String> str = priceFutures.stream().map(CompletableFuture::join).collect(toList());
        System.out.print(str);

 

2126

使用的這三個map跟同步沒有太大的區別,但是使用了CompletableFuture類提供的特性,在需要的地方把他們變成了非同步操作。

thenApply方法:當第一個Future執行結束,返回CompletableFuture<String>物件轉換為CompleTableFuture<Quote>物件。

thenCompose方法:將兩個非同步操作進行流水線,當第一個操作完成時,將其結果作為引數傳遞給第二個操作。換句話說,你可以建立兩個CompletableFuture物件,對第一個物件呼叫thenCompose,並向其傳遞一個函式。

這個方法也有Async版本:thenComposeAsync,通常帶字尾的版本是講任務移交到一個新執行緒,不帶字尾的在當前執行緒執行。對於這個例子我們沒有加上字尾,因為對於最終結果,或者大致的時間而言都沒有多少差別,少了很多執行緒切換的開銷。

 

合併兩個CompletableFuture,無論是否依賴

與上面不同,第二個CompletableFuture無需等待第一個CompletableFuture執行結束。而是,將兩個完全不相干的CompletableFuture物件整合起來,不希望等到第一個任務完全結束才開始第二個任務。

這種情況應該使用thenCombine方法,它接受名為BiFunction的第二個引數,這個引數定義了當兩個CompletableFuture物件完成計算後,結果如何合併。同thenCompose方法一樣,thenCombine方法也提供了一個Async的版本。使用thenCombineAsync會導致BiFunction中定義的合併操作被提交到執行緒池中,由另一個任務以非同步的方式執行。

回到這個例子,比如說我們現在需要第三個CompletableFuture來獲取匯率,展示美元。當前兩個CompletableFuture計算出結果,並由BiFunction方法完全合併後,由它來最終將誒書這一任務:

 

Future<Double> futurePriceUSD = CompletableFuture.supplyAsync(()->shops.get(0).getPrice("gg"))
                .thenCombine(
                        CompletableFuture.supplyAsync(
                                ()-> 0.66 //遠端服務獲取 匯率
                        ),(price,rate) -> price * rate
                );

 

這裡 第一個引數price 是 getPrice的返回值 double , 第二個引數 rate 是第二個工廠方法返回的0.66 偷了個懶, 最後是他們的結果進行乘法操作 返回最終結果。

 

響應CompletableFuture的completion事件

在本章中,所有的延遲例子都是延遲1秒鐘,但是在現實世界中,有時可能更糟。到目前為止,你所實現的方法必須等待所有的商品返回時才能現實商品的價格。而你希望的效果是,只要有商品返回商品價格就在第一時間顯示出來,不用等待那些還沒有返回的商品。

 

CompletableFuture[] futures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getPrice("hhhhh", executor)
                ))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote), executor)
                ))
                //在每個CompletableFuture上註冊一個操作,該操作會在CompletableFuture完成後使用它的返回值。
                //使用thenAccept將結果輸出,它的引數就是 CompletableFuture的返回值。
                .map(f -> f.thenAccept(System.out::println))
                //你可以把構成的Stream的所有CompletableFuture<void>物件放到一個數組中,等待所有的任務執行完成
                .toArray(size -> new CompletableFuture[size]);
       
        //allOf方法接受一個CompletableFuture構成的陣列,陣列中所有的COmpletableFuture物件執行完成後,
        //它返回一個COmpletableFuture<Void>物件。所以你需要哦等待最初Stream中的所有CompletableFuture物件執行完畢,
        //對allOf方法返回的CompletableFuture執行join操作
        CompletableFuture.allOf(futures).join();

 

Connected to the target VM, address: '127.0.0.1:62278', transport: 'socket'
8twoprice :113.31
threeprice :108.15
oneprice :137.844
Disconnected from the target VM, address: '127.0.0.1:62278', transport: 'socket'
fourprice :119.2725
3768

還有一個方法anyOf,對於CompletableFuture物件陣列中有任何一個執行完畢就不在等待時使用。

小結:

  1.執行比較耗時的操作時,尤其是那些依賴一個或多個遠端服務的操作,使用非同步任務可以改善程式的效能,加快程式的響應速度。

  2.你應該儘可能的為客戶提供非同步API。使用CompletableFuture類提供的特性,能夠輕鬆的實現這一目標。

  3.CompletableFuture類還提供了異常管理的機制,然給你有機會丟擲/管理非同步任務執行中發生的異常。

  4.將同步API的呼叫封裝到一個CompletableFuture中,你能夠以非同步的方式使用其結果。

  5.如果非同步任務之間互相獨立,或者他們之間某一些的結果是另一些的輸入,你可以講這些非同步任務合併成一個。

  6.你可以為CompletableFuture註冊一個回撥函式,在Future執行完畢或者他們計算的結果可用時,針對性的執行一些程式。

  7.你可以決定在什麼時候將誒書程式的執行,是等待由CompletableFuture物件構成的列表中所有的物件都執行完畢,還是隻要其中任何一個首先完成就終止程式的執行。