Netty進階:Futrue&Promise原始碼解析
文章目錄
併發程式設計中,我們通常會用到一組非阻塞的模型:Future 和 Callback、Promise。其中的 Future 表示一個可能還沒有實際完成的非同步任務的結果,針對這個結果可以新增 Callback 以便在任務執行成功或失敗後做出對應的操作,而Promise交由任務執行者,任務執行者通過 Promise 可以標記任務完成或者失敗。 這一套模型是很多非同步非阻塞架構的基礎。
JDK中實現了Future和CallBack的模式,並沒有Promise以及實現。Netty有類似“呼叫者和執行部件以非同步的方式互動通訊結果”的需求(要知道eventloop本質上是一個ScheduledExecutorService,ExecutorService是一種“提交-執行”模型實現,也存線上程間非同步方式通訊和執行緒安全問題),所以Netty實現了一套完整的Futre 和 Callback、Promise架構。
關於Future以及FutureTask的原理請見併發程式設計之Future原始碼解析
1. Future&Promise
也許你已經使用過JDK的Future物件,該介面的方法如下:
// 取消非同步操作
boolean cancel(boolean mayInterruptIfRunning);
// 非同步操作是否取消
boolean isCancelled();
// 非同步操作是否完成,正常終止、異常、取消都是完成
boolean isDone();
// 阻塞直到取得非同步操作結果
V get() throws InterruptedException, ExecutionException;
// 同上,但最長阻塞時間為timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
介面中只有isDone()方法判斷一個非同步操作是否完成,但是對於完成的定義過於模糊,JDK文件指出如果任務正在執行過程中返回false,正常終止、丟擲異常、使用者取消都會使isDone()方法返回true,其實這是JDK介面的約定。在我們的使用中,我們極有可能是對這三種情況分別處理,而JDK這樣的設計不能滿足我們的需求。
對於一個非同步操作,我們更關心的是這個非同步操作觸發或者結束後能否再執行一系列動作。Netty擴充套件了JDK的Future介面,使其能解決上面的問題。擴充套件的方法如下:
// 非同步操作完成且正常終止
boolean isSuccess();
// 非同步操作是否可以取消
boolean isCancellable();
// 非同步操作失敗的原因
Throwable cause();
// 新增一個監聽者,非同步操作完成時回撥,類比javascript的回撥函式
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到非同步操作完成
Future<V> await() throws InterruptedException;
// 同上,但非同步操作失敗時丟擲異常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回非同步結果,如果尚未完成返回null
V getNow();
Netty中future的特點:
- 操作狀態分為success,fail,canceled三種。
- 並且通過addlisteners()方法可以添加回調操作,即異常、取消觸發或者完成時需要進行的操作,類似於js中的callBack()。
- await()和sync(),可以以阻塞的方式等待非同步完成;getnow()可以獲得非同步操作的結果,如果還未完成則返回Null。
future只有兩種執行結果狀態,unconpleted和conpleted,每種狀態對應方法的返回值如下圖
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
Future介面中的方法都是getter方法而沒有setter方法,也就是說這樣實現的Future子類的狀態是不可變的,如果我們想要變化,那該怎麼辦呢?Netty提供的解決方法是:使用可寫的Future即Promise。Promise介面擴充套件的方法如下:
// 標記非同步操作結果為成功,如果已被設定(不管成功還是失敗)則丟擲異常IllegalStateException
Promise<V> setSuccess(V result);
// 同上,只是結果已被設定時返回False
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 設定結果為不可取消,結果已被取消返回False
boolean setUncancellable();
需要注意的是:Promise介面繼承自Future介面,它提供的setter方法與常見的setter方法大為不同。Promise從Uncompleted–>Completed的狀態轉變有且只能有一次,也就是說setSuccess和setFailure方法最多隻會成功一個,此外,在setSuccess和setFailure方法中會通知註冊到其上的監聽者。
至此,我們從總體上了解了Future和Promise的原理。我們再看一下類圖:
2. AbstractFuture
AbstractFuture主要實現Future的get()方法,取得Future關聯的非同步操作結果,
protected EventExecutor executor() {
return executor;
}
@Override
public V get() throws InterruptedException, ExecutionException {
//阻塞直到非同步任務完成
await();
Throwable cause = cause();
if (cause == null) {
//獲得非同步操作結果
return getNow();
}
//操作失敗則丟擲異常
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public boolean isDone() {
return true;
}
@Override
public Future<V> sync() throws InterruptedException {
return this;
}
3.Completefuture
completedfuture表示已經完成非同步操作,該類在非同步操作結束時建立,使用者使用addlistener()方法提供非同步操作方法。Completefuture持有執行任務的執行緒,用來執行listener中的任務。Completefuture表示已經完成非同步操作,所以isdone()方法返回true;並且sync()方法和await()方法會立即返回。
private final EventExecutor executor;
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
protected EventExecutor executor() {
return executor;
}
@Override
public boolean isDone() {
return true;
}
@Override
public Future<V> sync() throws InterruptedException {
return this;
}
我們再看addListener()和removeListener()方法:
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
DefaultPromise.notifyListener(executor(), this, listener);
return this;
}
addListener內部呼叫DefaultPromise的靜態方法,關於這部分內容在DefaultPromise中講解,Completefuture是表示完成的非同步結果,大致是將listener新增到任務佇列,交給EventExecutor去執行。
Completefuture類總結:
- Conpletefuture中儲存了eventexecutor的資訊,用來執行listener中的任務。
- 呼叫了future的addlistener()方法後,將listener中的操作封裝成runnble任務扔到eventexecutor中的任務佇列中等待執行。
- Completefuture表示已經完成非同步操作,狀態是isdone。
- GenericFutureListener 繼承了JDK提供的空介面EventListener,內部只有一個方法operationComplete。
4.Channelfuture&Completechannelfuture
Channelfuture繼承future的介面,顧名思義,該介面與通道操作有關,所以在channelfuture介面中,除了覆蓋future的功能外,只提供了一個channel()抽象方法。
Channel channel();
Completechannelfuture類其實都可以猜出來,實現Channelfuture介面,繼承Completefuture類。
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
尖括號中的泛型表示Future關聯的結果,此結果為Void,意味著CompleteChannelFuture不關心這個特定結果即get()相關方法返回null。也就是說,我們可以將CompleteChannelFuture純粹的視為一種回撥函式機制。
下面是它的成員變數和建構函式:
private final Channel channel;
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
super(executor);
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
CompleteChannelFuture的大部分方法實現中,只是將方法返回的Future覆蓋為ChannelFuture物件(ChannelFuture介面的要求),executor()方法:
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
如果父類中eventexecutor不為空,則返回父類中的eventexecutor,否則返回channel中儲存的eventexecutor。
5.Succeededchannelfuture/FailedChannelFuture
Succeeded/FailedChannelFuture為特定的兩個非同步操作結果,回憶Future狀態的講解:
Succeeded: isSuccess() = true, cause() = null;
Failed: isSuccess() = false, cause() = non-null
程式碼也是寫死的:
#FailedChannelFuture
@Override
public Throwable cause() {
return cause;
}
#Succeededchannelfuture
@Override
public boolean isSuccess() {
return true;
}
6.DefaultPromise
DefaultPromise繼承了AbstractFuture,同時實現了Promise介面。看其中的static欄位:
// 可以巢狀的Listener的最大層數,可見最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result欄位由使用RESULT_UPDATER更新
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");;
// 此處的Signal是Netty定義的類,繼承自Error,非同步操作成功且結果為null時設定為改值
private static final Signal SUCCESS = new Object();
private static final Signal UNCANCELLABLE = new Object();
// 非同步操作失敗時儲存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(...);
巢狀的Listener,是指在listener的operationComplete方法中,可以再次使用future.addListener()繼續新增listener,Netty限制的最大層數是8,使用者可使用系統變數io.netty.defaultPromise.maxListenerStackDepth設定。
再看其中的私有欄位:
// 非同步操作結果
private volatile Object result;
// 執行listener操作的執行器
private final EventExecutor executor;
// 監聽者
private Object listeners;
// 阻塞等待該結果的執行緒數
private short waiters;
// 通知正在進行標識
private boolean notifyingListeners;
listeners是一個Object型別。這似乎不合常理,一般情況下我們會使用一個集合或者一個數組。Netty之所以這樣設計,是因為大多數情況下listener只有一個,用集合和陣列都會造成浪費。當只有一個listener時,該欄位為一個GenericFutureListener物件;當多餘一個listener時,該欄位為DefaultFutureListeners,可以儲存多個listener。內部是GenericFutureListener陣列,我們分析關鍵方法addListener():
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener); // 保證多執行緒情況下只有一個執行緒執行新增操作
}
if (isDone()) {
notifyListeners(); // 非同步操作已經完成通知監聽者
}
return this;
}
從程式碼中可以看出,在新增Listener時,如果非同步操作已經完成,則會notifyListeners():
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener; // 只有一個
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener); // 大於兩個
} else {
// 從一個擴充套件為兩個
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
在新增Listener時,如果非同步操作已經完成,則會notifyListeners():
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) { //執行執行緒為指定執行緒
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth(); // 巢狀層數
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 執行前增加巢狀層數
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 執行完畢,無論如何都要回滾巢狀層數
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 外部執行緒則提交任務給執行執行緒
safeExecute(executor, () -> { notifyListenersNow(); });
}
如果是外部執行緒,則只能提交任務到指定Executor,其中的操作最終由指定Executor執行。不管哪個執行緒都會執行notifyListenersNow()方法:
private void notifyListenersNow() {
Object listeners;
// 此時外部執行緒可能會執行新增Listener操作,所以需要同步
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
// 正在通知或已沒有監聽者(外部執行緒刪除)直接返回
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) { // 通知多個
notifyListeners0((DefaultFutureListeners) listeners);
} else { // 通知單個
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
// 執行完畢且外部執行緒沒有再新增監聽者
if (this.listeners == null) {
notifyingListeners = false;
return;
}
// 外部執行緒添加了監聽者繼續執行
listeners = this.listeners;
this.listeners = null;
}
}
}
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
在該方法中,首先將Listeners取出來,然後將其清空(每次觸發完listeners都會將原來的listeners清空),然後執行listener中具體的操作,執行完操作,會再次檢查是否又有listeners新增進來,確保無誤後,從方法中退出。
分析完了Promise最重要的addListener()和notifyListener()方法,在原始碼中還有static的notifyListener()方法,這些方法是CompleteFuture使用的,對於CompleteFuture,新增監聽者的操作判斷任務是否完成,直接執行Listener中的方法即可。
那麼,當我們新增完Listener後,什麼時候會執行,當然除了addListener方法中isDone為true是一種特殊情況,正常情況下,不會為true,Netty中用通知的方式來對後續的listener中的操作,操作結果等進行控制。通知的前提包括success,fail,cancel三種狀態。三個操作至多隻能呼叫一個且同一個方法至多生效一次,再次呼叫會丟擲異常(set)或返回失敗(try)。這些設定方法原理相同,我們以setSuccess()為例分析:
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners(); // 可以設定結果說明非同步操作已完成,故通知監聽者,就是上面分析的方法
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 為空設定為Signal物件SUCCESS(Object)
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// 只有結果為null或者UNCANCELLABLE時才可設定且只可以設定一次
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters(); // 通知等待的執行緒
return