1. 程式人生 > >Java8新特性之:CompletableFuture

Java8新特性之:CompletableFuture

java8 CompletableFuture

一. CompletableFuture

1.Future接口

Future設計的初衷:對將來某個時刻會發生的結果進行建模。

它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中出發那些潛在耗時的操作把調用線程解放出來,讓它能繼續執行其他有價值的工作,不再需要等待耗時的操作完成。

Future的優點:比更底層的Thread更易用。

要使用Future,通常只需要將耗時的操作封裝在一個Callable對象中,再將它提交給ExecutorService。

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() { //向ExecutorService提交一個Callable對象
    @Override
    public Double call() throws Exception {
        return doSomeLongComputation(); //以異步方式在新的線程中執行耗時的操作
    }
    });
doSomethingElse(); //異步操作進行的同時,你可以做其他的事情
try {
    //獲取異步操作的結果,如果最終被阻塞,無法得到結果,那麽在最多等待1秒鐘之後退出
    Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
    //加上拋出一個異常
} catch (InterruptedException ie) {
    //當前線程在等待過程中被中斷
} catch (TimeoutException te) {
    //在Future對象完成之前超過已過期
}


2. 實現異步API、代碼避免阻塞

使用工廠方法supplyAsync創建CompletableFuture

public Future<Double> getPriceAsync2(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}


supplyAsync方法接受一個生產者(Supplier)作為參數,返回一個CompletableFuture對象,該對象完成異步執行後會讀取調用生產者方法的返回值。

生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個參數指定不同的執行線程執行生產者方法。

join方法等待異步操作結束

CompletableFuture類中的join方法和Future接口中的get有相同的含義,等待運行結束。並且也聲明在Future接口中,唯一的不同是join方法不會拋出任何檢測到的異常。因此使用它時不需要再使用try/catch語句塊。

public List<String> findPrices(String product) {
//使用CompletableFuture以異步方式計算每種商品的價格
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPriceAsync(product)))
.collect(Collectors.toList());
//等待所有異步操作結束
return priceFutures.stream().
map(CompletableFuture::join)
.collect(Collectors.toList());
}


使用定制執行器

創建一個配有線程池的執行器。

線程數的選擇:N(threads) = N(CPU) * U(CPU) * (1 + W/C)

-- N(CPU):處理器的核的數目,可以通過Runtime.getRuntime().availableProcessors()得到;

-- U(CPU):期望的CPU利用率(該值應該介於0和1之間);

-- W/C:等待時間與計算時間的比率。

//創建一個線程池,線程池中線程的數目為100和商店數目二者中較小的一個值(這裏100為線程池的上限)
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true); //使用守護線程--這種方式不會阻止程序的關停
return t;
}
});


集合進行並行計算有兩種方式:並行流和CompletableFutures。

-- 計算密集型操作,並且沒有I/O,推薦使用Stream接口。因為實現簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要創建比處理器核數更多的線程);

-- 如果並行的工作單元還涉及等待I/O的操作(包括網絡連接等待),那麽使用CompletableFuture靈活性更好。這種情況下處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麽時候觸發了等待。

3. 對多個異步任務進行流水線操作

thenApply:將一個由字符串轉換Quote的方法作為參數傳遞給他

thenCompose:該方法允許你對兩個異步操作進行流水線,第一個操作完成時,將其結果作為參數傳遞給第二個操作。

對第一個CompletableFuture對象調用thenCompose,並向其傳遞一個函數。當第一個CompletableFuture執行完畢後,他的結果將作為該函數的參數,這個函數的返回值是以第一個CompletableFuture的返回做輸入計算出的第二 個CompletableFuture對象。

使用thenCompose減少很多線程切換開銷。

thenCombine:將兩個CompletableFuture對象結果整合起來。該方法接收名為BiFunction的第二參數,這個參數定義了兩個CompletableFuture對象完成計算後,如何合並。

thenAccept:方法接收CompletableFuture執行完畢後的返回值做參數。不必等待那些還未返回的結果。


Java8新特性之:CompletableFuture