1. 程式人生 > >netty原始碼解析(4.0)-29 Future模式的實現

netty原始碼解析(4.0)-29 Future模式的實現

  Future模式是一個重要的非同步併發模式,在JDK有實現。但JDK實現的Future模式功能比較簡單,使用起來比較複雜。Netty在JDK Future基礎上,加強了Future的能力,具體體現在:

  1. 更加簡單的結果返回方式。在JDK中,需要使用者自己實現Future物件的執行及返回結果。而在Netty中可以使用Promise簡單地呼叫方法返回結果。
  2. 更加靈活的結果處理方式。JDK中只提供了主動得到結果的get方法,要麼阻塞,要麼輪詢。Netty除了支援主動get方法外,還可以使用Listener被動監聽結果。
  3. 實現了進度監控。Netty提供了ProgressiveFuture、ProgressivePromise和GenericProgressiveFutureListener介面及其實現,支援對執行程序的監控。

  吹了那麼多牛,有一個關鍵問題還沒弄清楚:Future到底是幹嘛的?io.netty.util.concurrent.Future程式碼的第一行註釋簡潔第回答了這個問題:Future就是非同步操作的結果。這裡面有三個關鍵字:非同步,操作,結果。首先,Future首先是一個“結果”;其次這個結果產生於一個“操作”,操作具體是什麼可以隨便定義;最後這個操作是"非同步"執行的,這就意味著“操作”可能在另一個執行緒中併發執行,也可能隨後在同一個執行緒中執行,什麼時候產生結果是一件不確定的事。

  非同步呼叫過程的一般過程是:呼叫方喚起一個非同步操作,在接下來的某個恰當的時間點得到的非同步操作操作的結果。要正確地完成上述步驟,需要解決以下幾個問題:

  • 怎樣維護這個呼叫狀態?
  • 如何獲取非同步操作的結果?
  • 何時處理結果?

  io.netty.util.concurrent.DefaultPromise是Future的預設實現,以上三個問題的答案都能在這個類的程式碼中找到。

 

DefaultPromise的派生體系

  下面是DefaultPromis及其父類,介面的宣告:

  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> 

  public abstract class AbstractFuture<V> implements Future<V>

  public interface Promise<V> extends Future<V> 

  public interface Future<V> extends java.util.concurrent.Future<V> 

   可以看出,DefaultPromise派生自AbstractFuture類,並實現了Promise介面。抽象型別AbstractFuture派生自Future, 介面Promise派生自Future。Future派生自JDK的Future介面。

  和JDK的Future相比,Netty的Future介面增加一些自己的方法:

   /**
     當操作成功時返回true*/
    boolean isSuccess();

    /**
   只有當操作可以被取消時返回true
*/ boolean isCancellable(); /** 返回操作的異常*/ Throwable cause(); /** 新增一個監聽器到future。當操作完成(成功或失敗都算完成,此事isDone()返回true)時, 會通知這個監聽器。如果新增時操作已經完成,
   這個監聽器會立即被通知。*/ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); /** 和上個方法一樣,可以同時新增多個監聽器*/ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** 刪除指定的監聽器, 如果這個監聽器還沒被通知的話。*/ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); /** 功能和上個方法一樣,可以同時刪除多個監聽器。*/ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** 同步等待直到操作完成。會被打斷。 */ Future<V> sync() throws InterruptedException; /**    同步等著知道操作完成。不會被打斷。 */ Future<V> syncUninterruptibly(); /** 同sync*/ Future<V> await() throws InterruptedException; /** 同synUniterruptibliy*/ Future<V> awaitUninterruptibly(); /** 等待,直到操作完成或超過指定的時間。會被打斷。*/ boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** 同上*/ boolean await(long timeoutMillis) throws InterruptedException; /** 同上,不會被打斷。*/ boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** 同上。*/ boolean awaitUninterruptibly(long timeoutMillis); /** 立即得到結果,不會阻塞。如果操作沒有完成或沒有成功,返回null*/ V getNow();

  Netty的Future最大的特點是增加了Listener被動接收任務完成通知,下面是兩個Listener介面的定義:

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
    void operationProgressed(F future, long progress, long total) throws Exception;
}

  把一個listener新增到future之後。當非同步操作完成之後,listener會被通知一次,同時會回撥operationComplete方法。引數future是當前通知的future,這意味這,一個listener可以被新增到多個future中。

  當非同步操作進度傳送變化時,listener會被通知,同時會回撥operationProgressed方法。progress是當前進度,total是總進度。progress==total表示操作完成。如果不知道何時完成操作progress=-1。

  Promise定義的方法:

    /**
    設定結果。把這個future設定為success,通知所有的listener,
  如果這個future已經是success或failed(操作已經完成),會丟擲IllegalStateException
*/ Promise<V> setSuccess(V result); /**
同上。只有在操作沒有完成的時候才會生效,且會返回true */ boolean trySuccess(V result); /** 設定異常。把這個future設定為failed狀態,通知所有的listener.
如果這個future已經完成,會丟擲IllegalStateException */ Promise<V> setFailure(Throwable cause); /** 同上。只有在操作沒有完成時才會生效,且返回ture */ boolean tryFailure(Throwable cause); /** 設定當前前future的操作不能被取消。這個future沒有完成且可以設定成功或這個future已經完成,返回true。否則返回false */ boolean setUncancellable();

 

DefaultPromise的設計

關鍵屬性

  volatile Object result;

  非同步操作的結果。可以通過它的值知道當前future的狀態。

  final EventExecutor executor;

  通知listener的執行緒。

  Object listeners;

  維護新增到當前future的listener物件。

  short waiters;

  記錄當前真正等待結果的執行緒數量。

  boolean notifyingListeners;

  是否正在通知listener,防止多執行緒併發執行通知操作。

 

狀態管理

  future有4種狀態: 未完成, 未完成-不能取消,完成-成功,完成-失敗。使用isDone()判斷是否完成,它程式碼如下:

1     @Override
2     public boolean isDone() {
3         return isDone0(result);
4     }
5 
6     private static boolean isDone0(Object result) {
7         return result != null && result != UNCANCELLABLE;
8     }

  第7行是判斷當前完成狀態的。result != null 且 result != UNCANCELLABLE,表示處於完成狀態。

  result預設是null, 此時future處於未完成狀態。可以使用setUncancellable方法把它設定成為完成-不能取消狀態。

1     @Override
2     public boolean setUncancellable() {
3         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
4             return true;
5         }
6         Object result = this.result;
7         return !isDone0(result) || !isCancelled0(result);
8     }

  第3行,使用原子操作設定result的值,只有result==null時才能把result設定成UNCANCELLABLE。當result==UNCANCELLABLE時,不允許取消非同步操作。

  使用isSuccess方法判斷future是否處於完成-成功狀態。

1     @Override
2     public boolean isSuccess() {
3         Object result = this.result;
4         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
5     }

  第4行是完成-成功狀態result的取值:除null, UNCANCELLABLE和CauseHolder物件的任何值。

  只有滿足isDone() && !isSuccess()時,future處於完成失敗狀態,可以使用cause方法獲取異常。

  呼叫setSuccess和trySuccess方法,能夠把狀態轉換成完成-成功。

 1     @Override
 2     public Promise<V> setSuccess(V result) {
 3         if (setSuccess0(result)) {
 4             notifyListeners();
 5             return this;
 6         }
 7         throw new IllegalStateException("complete already: " + this);
 8     }
 9     
10     private boolean setSuccess0(V result) {
11         return setValue0(result == null ? SUCCESS : result);
12     }

  第3行嘗試把狀態設定成完成-成功狀態。如果可以,在第4行通知所有的listener。否則第7行丟擲錯誤。第11行給出了成功的預設值SUCCESS。trySuccess少了第7行,不會丟擲異常。

  呼叫setFailure和tryFailure方法,能夠包狀態轉換成完成-失敗狀態。

 1     @Override
 2     public Promise<V> setFailure(Throwable cause) {
 3         if (setFailure0(cause)) {
 4             notifyListeners();
 5             return this;
 6         }
 7         throw new IllegalStateException("complete already: " + this, cause);
 8     }
 9 
10     private boolean setFailure0(Throwable cause) {
11         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
12     }

  第3行嘗試把專題設定成完成-失敗狀態。如果可以,在第4行通知所有listener。否則在第7行丟擲異常。第11行把異常包裝成CauseHolder物件。tryFailure少了第7行,不會丟擲異常。

 

獲取非同步操作的結果

  當非同步操作完成時,呼叫Promise提供的setSuccess和trySuccess設定成功的結果,呼叫setFailure和tryFailure設定異常結果。不論什麼結果,都會使用setValue0方法儲存到result屬性上。

1     private boolean setValue0(Object objResult) {
2         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
3             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
4             checkNotifyWaiters();
5             return true;
6         }
7         return false;
8     }

  第2,3行,使用原子操作設定result的值,只有result==null或result==UNCANCELLABLE時,才能設定成功。如果設定成功,在第4行喚醒所有等待中的執行緒。可以使用get方法得到result值。如果isSucess()==true, result的值是SUCCESS或非同步操作的結果。否則result的值是CauseHolder物件,此時可以呼叫cause方法得到異常物件。

  使用get或cause,只有在非同步操作完成後才能順利得到結果。可以使用listener,被動等待操作完成通知。

 

使用listener非同步通知處理結果

  Future的listener是必須實現GenericFutureListener介面,呼叫方法可以在operationComplete方法中處理非同步操作的結果。

  listeners屬性用來儲存使用addListener,addListeners方法新增到future的listener。listeners可能使用一個GenericFutureListener物件,也可能是一個GenericFutureListener陣列。所有新增listener方法都會呼叫addListener0方法新增listener。

1     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
2         if (listeners == null) {
3             listeners = listener;
4         } else if (listeners instanceof DefaultFutureListeners) {
5             ((DefaultFutureListeners) listeners).add(listener);
6         } else {
7             listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
8         }
9     }

  這段程式碼中使用了一個DefaultFutureListeners類,它內部維護了一個GenericFutureListener陣列。

  當一次操作完成時,會呼叫notifyListeners方法通知listeners中所有的listener,並呼叫listener的operationComplete方法。只有當isDone()==true時才會呼叫notifyListeners方法。觸發點在下面的一些方法中:

  addListener, addListeners。

  setSuccess, trySuccess。

  setFailure, tryFailure。

  notifyListeners的程式碼如下:

 1     private void notifyListeners() {
 2         EventExecutor executor = executor();
 3         if (executor.inEventLoop()) {
 4             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
 5             final int stackDepth = threadLocals.futureListenerStackDepth();
 6             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
 7                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
 8                 try {
 9                     notifyListenersNow();
10                 } finally {
11                     threadLocals.setFutureListenerStackDepth(stackDepth);
12                 }
13                 return;
14             }
15         }
16 
17         safeExecute(executor, new Runnable() {
18             @Override
19             public void run() {
20                 notifyListenersNow();
21             }
22         });
23     }

  這段程式碼的作用是呼叫notifyListenersNow。如果當前執行緒就是executor的執行緒,在第9行直接呼叫notifyListenerNow,否則在第20行,把notifyListnerNow放在executor中執行。第4-7行和11行的作用是防止遞迴呼叫導致執行緒棧溢位,MAX_LISTENER_STACK_DEPTH就是listener遞迴呼叫的最大深度。

  notifyListenerNow的作用是,確保沒有併發執行notifyListener0或notifyListners0方法,且所有的listener只能被通知一次。

 1     private void notifyListenersNow() {
 2         Object listeners;
 3         synchronized (this) {
 4             // Only proceed if there are listeners to notify and we are not already notifying listeners.
 5             if (notifyingListeners || this.listeners == null) {
 6                 return;
 7             }
 8             notifyingListeners = true;
 9             listeners = this.listeners;
10             this.listeners = null;
11         }
12         for (;;) {
13             if (listeners instanceof DefaultFutureListeners) {
14                 notifyListeners0((DefaultFutureListeners) listeners);
15             } else {
16                 notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
17             }
18             synchronized (this) {
19                 if (this.listeners == null) {
20                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
21                     // need to be in a finally block.
22                     notifyingListeners = false;
23                     return;
24                 }
25                 listeners = this.listeners;
26                 this.listeners = null;
27             }
28         }
29     }

  第3-11行的作用是防止多個執行緒併發執行11行之後的程式碼。

  結合第5,9,10行可知, listeners中的所有listener只能被通知一次。

  13-17行,通知所有listeners。notifyListener0通知一個listener,notifyListeners0通知所有的listener。

  最後,18-27行,檢查在通知listeners的過程中,是否有新的listener被新增進來。如果有,25,26行得到所有新新增的listener並清空listeners屬性,13-17行繼續通知新新增的listener。否則,執行22,23行結束通知過程。

 1     private void notifyListeners0(DefaultFutureListeners listeners) {
 2         GenericFutureListener<?>[] a = listeners.listeners();
 3         int size = listeners.size();
 4         for (int i = 0; i < size; i ++) {
 5             notifyListener0(this, a[i]);
 6         }
 7     }
 8 
 9     @SuppressWarnings({ "unchecked", "rawtypes" })
10     private static void notifyListener0(Future future, GenericFutureListener l) {
11         try {
12             l.operationComplete(future);
13         } catch (Throwable t) {
14             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
15         }
16     }

  1-7行,notifyListeners0對每個listener呼叫一次notifyListener0,引數是當前的future。

  10-16,呼叫listener的operationComplete方法,捕獲了所有的異常,確保接下來可以繼續通知下一個listener。

 

使用await機制同步等待結果

  可以使用一系列的await,awaitXXX方法同步等待結果。這些方法可以分為: 能被打斷的,不能被打斷的。一直等待的,有超時時間的。await0方法是最複雜的等待實現,所有帶超時時間的await方法都會呼叫它。

 1     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
 2         if (isDone()) {
 3             return true;
 4         }
 5 
 6         if (timeoutNanos <= 0) {
 7             return isDone();
 8         }
 9 
10         if (interruptable && Thread.interrupted()) {
11             throw new InterruptedException(toString());
12         }
13 
14         checkDeadLock();
15 
16         long startTime = System.nanoTime();
17         long waitTime = timeoutNanos;
18         boolean interrupted = false;
19         try {
20             for (;;) {
21                 synchronized (this) {
22                     if (isDone()) {
23                         return true;
24                     }
25                     incWaiters();
26                     try {
27                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
28                     } catch (InterruptedException e) {
29                         if (interruptable) {
30                             throw e;
31                         } else {
32                             interrupted = true;
33                         }
34                     } finally {
35                         decWaiters();
36                     }
37                 }
38                 if (isDone()) {
39                     return true;
40                 } else {
41                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
42                     if (waitTime <= 0) {
43                         return isDone();
44                     }
45                 }
46             }
47         } finally {
48             if (interrupted) {
49                 Thread.currentThread().interrupt();
50             }
51         }
52     }

  這個方法返回的條件有: (1)isDone()==true;(2)允許被打斷(interrupted==true)的情況下被打斷;(3)已經超時。2-12行分別檢查了這3種情況。

  25,35行管理waiters屬性,這個屬性用來記錄當前正在等待的執行緒數。inWaiters方法正常情況下會把waiters加1,當檢查到waiters==Short.MAX_VALUE時會丟擲異常,防止過多的執行緒等待。

  27行,呼叫wait等待,經歷waitTime後超時返回。在等待過程中,會被setValue0方法呼叫notifyAll喚醒。

  29-33行,處理被打斷的異常,如果執行被打斷,在30行丟擲這個異常返回。

  38-45行,不論什麼原因執行緒被喚醒,檢查是否滿足返回條件,如果不滿足,繼續迴圈等待。

  沒有超時的wait方法實現要簡單一些,只需判讀返回條件(1)(2)。

 

跟蹤非同步操作的執行進度

  如果想要跟蹤非同步操作的執行進度,future需要換成DefaultProgressivePromise物件,listener需要換成GenericProgressiveFutureListener型別。DefaultProgressivePromise派生自DefaultPromise同時實現了ProgressivePromise介面。GenericProgressiveFutureListener介面派生自GenericFutureListener介面。

  ProgressivePromise定義了setProgress和tryProgress方法用來更新進度,是不是很眼熟,和Promise介面定義返回結果的方法很類似。

ProgressivePromise<V> setProgress(long progress, long total);
boolean tryProgress(long progress, long total);

  GenericProgressiveFutureListener定義了operationProgressed方法用來處理進度更新通知。

     void operationProgressed(F future, long progress, long total) throws Exception;

  

  DefaultProgressivePromise自己只實現了setProgress和tryProgress方法,其它都是複用了DefaultPromise的實現。

 1     @Override
 2     public ProgressivePromise<V> setProgress(long progress, long total) {
 3         if (total < 0) {
 4             // total unknown
 5             total = -1; // normalize
 6             if (progress < 0) {
 7                 throw new IllegalArgumentException("progress: " + progress + " (expected: >= 0)");
 8             }
 9         } else if (progress < 0 || progress > total) {
10             throw new IllegalArgumentException(
11                     "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))");
12         }
13 
14         if (isDone()) {
15             throw new IllegalStateException("complete already");
16         }
17 
18         notifyProgressiveListeners(progress, total);
19         return this;
20     }

  3-12行,檢查progress和total的合法性。

  14行,如isDone()==true,丟擲異常。只有在操作還沒完成的是否更新進度才有意義。

  18行,呼叫notifyProgressiveListeners觸發進度更新通知,這個方法在DefaultPromise中實現。

  notifyProgressiveListeners實現了觸發進度更新通知的主要流程:

 1     void notifyProgressiveListeners(final long progress, final long total) {
 2         final Object listeners = progressiveListeners();
 3         if (listeners == null) {
 4             return;
 5         }
 6 
 7         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
 8 
 9         EventExecutor executor = executor();
10         if (executor.inEventLoop()) {
11             if (listeners instanceof GenericProgressiveFutureListener[]) {
12                 notifyProgressiveListeners0(
13                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
14             } else {
15                 notifyProgressiveListener0(
16                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
17             }
18         } else {
19             if (listeners instanceof GenericProgressiveFutureListener[]) {
20                 final GenericProgressiveFutureListener<?>[] array =
21                         (GenericProgressiveFutureListener<?>[]) listeners;
22                 safeExecute(executor, new Runnable() {
23                     @Override
24                     public void run() {
25                         notifyProgressiveListeners0(self, array, progress, total);
26                     }
27                 });
28             } else {
29                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
30                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
31                 safeExecute(executor, new Runnable() {
32                     @Override
33                     public void run() {
34                         notifyProgressiveListener0(self, l, progress, total);
35                     }
36                 });
37             }
38         }
39     }

  第3行,從listeners中選出GenericProgressiveFutureListener型別的listener。

  10-38行。呼叫notifyProgressiveListeners0, notifyProgressiveListener0通知進度跟新。11-17行,在當前執行緒中呼叫。

  19-37行,在executor中呼叫。notifyProgressiveListener0只是簡單地呼叫listener的operationProgressed方法。notifyProgressiveListeners0是對每個listener呼叫一次notifyProgressiveListener0。

  和完成通知相比,進度更新通知要更加簡單。進度更新通知沒有處理併發問題,沒有處理棧溢位問題。