1. 程式人生 > >併發程式設計(三)Promise, Future 和 Callback

併發程式設計(三)Promise, Future 和 Callback

併發程式設計(三)Promise, Future 和 Callback

在併發程式設計中,我們通常會用到一組非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一個可能還沒有實際完成的非同步任務的結果,針對這個結果可以新增 Callback 以便在任務執行成功或失敗後做出對應的操作,而 Promise 交由任務執行者,任務執行者通過 Promise 可以標記任務完成或者失敗。 可以說這一套模型是很多非同步非阻塞架構的基礎。

這一套經典的模型在 Scala、C# 中得到了原生的支援,但 JDK 中暫時還只有無 Callback 的 Future 出現,當然也並非在 Java 界就沒有發展了,比如 Guava 就提供了 ListenableFuture 介面,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 機制。

一、Future 模式 - 將來式(JDK)

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(() -> {
    TimeUnit.SECONDS.sleep(5);
    return 5;
});
Integer result = future.get();

二、Future 模式--回撥式(Guava)

Future 模式的第二種用法便是回撥。很不幸的事,JDK 實現的 Future 並沒有實現 callback, addListener 這樣的方法,想要在 JAVA 中體驗到 callback 的特性,得引入一些額外的框架。

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>21.0</version>
</dependency>
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
Futures.addCallback(future, new FutureCallback<Integer>() {
    public void onSuccess(Integer result) {
        System.out.println("success:" + result);
    }

    public void onFailure(Throwable throwable) {
        System.out.println("fail, e = " + throwable);
    }
});

Thread.currentThread().join();

三、Future 模式--回撥式(Netty4)

Netty 除了是一個高效能的網路通訊框架之外,還對 jdk 的Future 做了擴充套件,引入 Netty 的 maven 依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.22.Final</version>
</dependency>
EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
io.netty.util.concurrent.Future<Integer> f = group.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
f.addListener(new FutureListener<Object>() {
    @Override
    public void operationComplete(io.netty.util.concurrent.Future<Object> objectFuture) throws Exception {
        System.out.println("計算結果::"+objectFuture.get());
    }
});

四、由 Callback Hell 引出 Promise 模式

同樣的如果你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,如果你對前端不熟悉,也不要緊,我們先來看看回調地獄(Callback Hell)是個什麼概念。

回撥是一種我們推崇的非同步呼叫方式,但也會遇到問題,也就是回撥的巢狀。當需要多個非同步回撥一起書寫時,就會出現下面的程式碼(以 js 為例):

asyncFunc1(opt, (...args1) => {
   asyncFunc2(opt, (...args2) => {
       asyncFunc3(opt, (...args3) => {
            asyncFunc4(opt, (...args4) => {
                // some operation
            });
        });
    });
});

雖然在 Java 業務程式碼中很少出現回撥的多層巢狀,這樣的程式碼不易讀,巢狀太深修改也麻煩。於是 ES6 提出了 Promise 模式來解決回撥地獄的問題。可能就會有人想問:Java 中存在 Promise 模式嗎?答案是肯定的。

前面提到了 Netty 和 Guava 的擴充套件都提供了 addListener 這樣的介面,用於處理 Callback 呼叫,但其實 jdk1.8 已經提供了一種更為高階的回撥方式:CompletableFuture。首先嚐試用 CompletableFuture 來解決回撥的問題。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    TimeUnit.SECONDS.sleep(5);
    return 100;
});
completableFuture.whenComplete((result, e) -> {
    System.out.println("結果:" + result);
});
Thread.currentThread().join();

五、Netty 中的 Promise 模式

Netty 文件說明 Netty 的網路操作都是非同步的, 在原始碼上大量使用了 Future/Promise 模型,在 Netty 裡面也是這樣定義的:

  • Future 介面定義了 isSuccess(), isCancellable(), cause() 這些判斷非同步執行狀態的方法。(read-only)
  • Promise 介面在 extends future 的基礎上增加了 setSuccess(), setFailure() 這些方法。(writable)
public interface Future<V> {
    // 取消非同步操作
    boolean cancel(boolean mayInterruptIfRunning);
    // 非同步操作是否取消
    boolean isCancelled();
    // 非同步操作是否完成,正常終止、異常、取消都是完成
    boolean isDone();
    // 阻塞直到取得非同步操作結果
    V get() throws InterruptedException, ExecutionException;
    // 同上,但最長阻塞時間為timeout
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Netty 對 JDK 的 Future 進行了擴充套件

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 非同步操作完成且正常終止
    boolean isSuccess();
    // 非同步操作是否可以取消
    boolean isCancellable();
    // 非同步操作失敗的原因
    Throwable cause();
    // 新增一個監聽者,非同步操作完成時回撥,類比javascript的回撥函式
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到非同步操作完成
    Future<V> await() throws InterruptedException;
    // 同上,但非同步操作失敗時丟擲異常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回非同步結果,如果尚未完成返回null
    V getNow();
}

Netty 的 Promise 對又對 Future 進行了擴充套件

public interface Promise<V> extends Future<V> {    
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
}

DefaultChannelPromise 是 ChannelPromise 的實現類,它是實際執行時的 Promoise 例項。

參考:

  1. 《併發程式設計 Promise, Future 和 Callback》:https://ifeve.com/promise-future-callback/

每天用心記錄一點點。內容也許不重要,但習慣很重要!