1. 程式人生 > >Future Promise 模式(netty原始碼9)

Future Promise 模式(netty原始碼9)

1. Future/Promise 模式

1.1. ChannelFuture的由來


由於Netty中的Handler 處理都是非同步IO操作,結果是未知的。

Netty繼承和擴充套件了JDK Future的API,定義了自身的Future系列型別,實現非同步操作結果的獲取和監控。


其中,最為重要的是ChannelFuture 。

程式碼如下:

public interface ChannelFuture extends Future<Void> {

    //...

    Channel channel();

    @Override

    ChannelFuture addListener(GenericFutureListener<?
extends Future<? super Void>> listener); @Override ChannelFuture sync() throws InterruptedException; //... }


之所以命名為ChannelFuture,表示跟Channel的操作有關。

ChannelFuture用於獲取Channel相關的操作結果,新增事件監聽器,取消IO操作,同步等待。


1.2. 來自Netty的官方建議


java.util.concurrent.Future是Java提供的介面,提供了對非同步操作的簡單幹預。

Future介面定義了isDone()、isCancellable(),用來判斷非同步執行狀態。Future介面的get方法,可以用來獲取結果。get方法首先會判斷任務是否執行完成,如果完成就返回結果,否則阻塞執行緒,直到任務完成。


Netty官方文件直接說明——Netty的網路操作都是非同步的,Netty原始碼上大量使用了Future/Promise模式。


如果使用者操作呼叫了sync或者await方法,會在對應的future物件上阻塞使用者執行緒,例如future.channel().closeFuture().sync()。


Netty 的Future 介面,在繼承了java.util.concurrent.Future的基礎上,增加了一系列監聽器方法,比如addListener()、removeListener() 等等。Netty強烈建議,通過新增監聽器的方式獲取IO結果,而不是通過JDK Future的同步等待的方式去獲取IO結果。


1.3. Netty 的 Future 介面


Netty擴充套件了Java的Future,增加了監聽器Listener介面,通過監聽器可以讓非同步執行更加有效率,不需要通過get來等待非同步執行結束,而是通過監聽器回撥來精確地控制非同步執行結束的時間點。

這一點,正好是Netty在Future模式的最主要的改進。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();

    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> sync() throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    boolean awaitUninterruptibly(long timeoutMillis);

    V getNow();

    boolean cancel(boolean mayInterruptIfRunning);

}
1.4. ChannelFuture使用的例項

Netty的出站和入站操作,都是非同步的。

以最為經典的NIO出站操作——write出站為例,說一下ChannelFuture的使用。

程式碼如下:

ChannelFuture future = ctx.channel().write(msg);
future.addListener(
        new ChannelFutureListener()
        {
            @Override
            public void operationComplete(ChannelFuture future)
            {
                // write操作完成後的回撥程式碼
            }
        });


在write操作呼叫後,Netty並沒有完成對Java NIO底層連線的寫入操作,出站操作是非同步執行的。

如果需要獲取IO結果,可以使用回撥的方式。


使用ChannelFuture的非同步完成後的回撥,需要搭配使用另外的一個介面ChannelFutureListener ,他從父介面哪裡繼承了一個被回撥到的operationComplete操作完成的方法。

ChannelFutureListener 的父親介面是GenericFutureListener 介面。

定義如下:

public interface GenericFutureListener
                 <F extends Future<?>> extends EventListener
{
      void operationComplete(F future) throws Exception;
}


非同步操作完成後的回撥程式碼,放在operationComplete方法中的實現中,就可以了。


1.5. Netty的 Promise介面


Netty的Future,只是增加了監聽器。整個非同步的狀態,是不能進行設定和修改的。

換句話說,Future是隻讀的,是不可以寫的。

於是,Netty的 Promise介面擴充套件了Netty的Future介面,它表示一種可寫的Future,就是可以設定非同步執行的結果。

部分原始碼如下:

public interface Promise<V> extends Future<V> {

    Promise<V> setSuccess(V result);

    Promise<V> setFailure(Throwable cause);

    boolean setUncancellable();

   //....

}
在IO操作過程,如果順利完成、或者發生異常,都可以設定Promise的結果,並且通知Promise的Listener們。


而ChannelPromise介面,則繼承擴充套件了Promise和ChannelFuture。所以,ChannelPromise既綁定了Channel,又具備了監聽器的功能,還可以設定IO操作的結果,是Netty實際程式設計使用的最多的介面。


在AbstratChannel的程式碼中,相當多的IO操作,都會返回ChannelPromise型別例項作為呼叫的返回值。 通過這個返回值,客戶程式可以用於讀取IO操作的結果,執行IO操作真正完成後的回撥。


1.6. ChannelPromise的監控流程

在AbstractChannel中,定義了幾個對Channel的非同步狀態進行監控的Promise和Future成員,用於監控Channel的連線是否成功,連線是否關閉。

原始碼如下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

        //連線成功的監控
        private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);

        //連線關閉的監控
        private final CloseFuture closeFuture = new CloseFuture(this);

//...
}
對於每個Channel物件,都會有唯一的一個CloseFuture 成員,用來表示關閉的非同步干預。如果要監控Channel的關閉,或者同步等待Channel關閉。

一般情況下,在應用程式中使用如下的程式碼:

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();


一般來說,編寫以上程式碼的都是在Main執行緒中用來啟動ServerBootStrap的,所以Main執行緒會被阻塞,保證服務端Channel的正常執行。

上面的程式碼中,channel.closeFuture()不做任何操作,只是簡單的返回channel物件中的closeFuture物件。而CloseFuture的sync方法,會將當前執行緒阻塞在CloseFuture上。


那麼,f.channel().closeFuture().sync() 實際是如何工作的呢?


1.7. CloseFuture的sync 同步方法


CloseFuture繼承了DefaultPromise的sync同步方法。

DefaultPromise的程式碼如下:

public class DefaultPromise<V> extends AbstractFuture<V>
                                                 implements Promise<V>
{

     private volatile Object result;
      //...
    @Override
    public Promise<V>  sync() throws InterruptedException {

       await();
        //...
    }

   @Override
    public Promise<V> await() throws InterruptedException {

        //...
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
               try {
                    wait();  //阻塞了,死等
                } finally {
                   decWaiters();
                }
            }
        }
        return this;
    }
//...

}


從原始碼可以看出,sync方法,呼叫了await方法。

在await方法中,CloseFuture 使用java 基礎的synchronized 方法進行執行緒同步;並且,使用CloseFuture.wait / notify 這組來自Object根類中的古老方法進行執行緒之間的等待和喚醒。


在await方法,不斷的自旋,判斷當前的 CloseFuture 例項的結果是否已經完成,如果沒有完成 !isDone() ,就不斷的等待。一直到 isDone() 的值為true。

isDone() 的原始碼如下:

@Override
public boolean isDone() {
        return isDone0(result);
}

private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
}


CloseFuture的 isDone() 的條件是否能夠滿足,和Channel的close 關閉連線的出站操作有關。


下一步,我們來看 isDone() 的條件,如何才能夠滿足?



1.8. close 出站處理流程


在Netty中,close 關閉連線的操作,屬於所有的出站操作的一種。關於Netty出站處理的流程,在前面的文字中,已經非常詳細的介紹了。這裡不再贅述,只是簡單的列出一個流程圖。

close 關閉連線的出站操作,其流程如下圖所示:

wps6221.tmp


一溜兒下來,最終會落地到unsafe.doClose 方法。

看看unsafe.doClose,是如何與CloseFuture的 isDone() 的條件進行關聯的。


1.9. unsafe.doClose


unsafe.doClose 方法中,設定了CloseFuture 的result值。

unsafe.doClose 原始碼如下:


protected abstract class AbstractUnsafe implements Unsafe

{

   private void close(final ChannelPromise promise,…) {
        //…
        try {
            // Close the channel
            doClose0(promise);
        } finally {
            // Fail all the queued messages.
            outboundBuffer.failFlushed(cause, notify);
            outboundBuffer.close(closeCause);
        }
         //……
    }
    }
   private void doClose0(ChannelPromise promise) {
    try {
        doClose();
        closeFuture.setClosed();
        safeSetSuccess(promise);
    } catch (Throwable t) {
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
  }

  //……

}
1.10. closeFuture.setClosed()

在closeFuture.setClosed() 設定關閉的結果的過程中,主要完成以下三個工作:

1  設定result的值

2  notifyAll,喚醒在本Promise上等待的執行緒

3  回撥listener

closeFuture.setClosed()的主要原始碼如下:

boolean setClosed() {
    return super.trySuccess();
}

@Override

//這個定義在父類中
public boolean trySuccess(V result) {
    if (setSuccess0(result)) {
notifyListeners();
        return true;
    }
    return false;
}

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

上面的 notifyListeners()呼叫,就是用來喚醒了等待在closeFuture 例項物件上的等待執行緒。

到了這裡,終於鬆了一口氣了。

之前通過 f.channel().closeFuture().sync()  同步操作,阻塞在哪兒的Main執行緒,終於通過channel.close() 方法,給喚醒了。

1.11. 警惕死鎖:Reactor執行緒不能sync

在上面的原始碼中,最終觸發future物件的notify動作的執行緒,都是eventLoop執行緒(Reactor執行緒)。一般情況下,Channel的出站和入站操作,都是在eventLoop執行緒的輪詢任務中完成的。

例如對close連線關閉的sync,因為不論是使用者直接關閉或者eventLoop的輪詢狀態關閉,都會在eventLoop的執行緒內完成notify動作。

所以不要在Reactor執行緒內呼叫future物件的sync或者await方法。如果在Reactor執行緒進行sync或者await,會有可能引起死鎖。

為什麼呢?在Reactor執行緒進行sync時,會進入等待狀態,等待Future(DefaultPromise)的 isDone 的條件滿足。通過前面的例子,我們已經看到了,而Future的isDone的條件,又需要Reactor執行緒的出站或者入站操作來滿足。這是,Reactor執行緒既然已經處於等待狀態,怎麼可能再進行其他的出站或者入站操作呢?相當於自己等自己,這就是典型的死鎖。

在實際開發中,由於應用程式程式碼都是編寫在自定義的channelHandler處理器中,而channelHandler是在eventLoop執行緒(Reactor執行緒)內執行的。

所以,不能在channelHandler中呼叫Future(DefaultPromise)的sync或者await兩個同步方法。

正確的做法是:通過給Future(DefaultPromise) 增加listeners監聽器 的方式,來干預非同步操作的過程,處理非同步操作的結果。