1. 程式人生 > >Netty核心概念(9)之Future

Netty核心概念(9)之Future

con RR 綜合 sta 處理 監聽器 ade 流程 override

1.前言

 第7節講解JAVA的線程模型中就說到了Future,並解釋了為什麽可以主線程可以獲得線程池任務的執行後結果,變成一種同步狀態。秘密就在於Java將所有的runnable和callable任務,統一變成了callable,最終包裝成了FutureTask對象,該類實現了Runnable接口和Future接口,所以FutureTask能夠被線程執行。最終異步執行過程全部由該類控制邏輯,所以在get的時候鎖住了該類,run方法執行的時候釋放了鎖,這樣就滿足了能夠在異步線程執行完畢獲取相關結果的能力。

 本章介紹一下Netty對Future的設計,Netty的聲明就是一個異步事件驅動框架,上一節學習了整個線程調度的過程,並在最後給出了前幾節的一個綜合流程圖,雖然圖中提到了幾種Future,但是沒有具體介紹細節,這些將在本節得到解釋。

2.相關概念

2.1 Future

 雖然Java中已經定義了Future,但是滿足不了Netty的需求,所以Netty新寫了一個Future接口,繼承了JDK的Future。額外方法定義如下圖:

技術分享圖片

 接口主要追加了兩個功能:1.增加了判斷任務是否成功失敗的方法,以及失敗獲取異常信息;2.增加了任務完成時觸發的監聽器

2.2 Promise

技術分享圖片

 該類繼承自Future,自然是增加了額外的功能了:這是一個可寫的Future。什麽意思呢?通過之前的知識,我們知道Future都是由異步線程控制的,主線程是無法控制線程執行的。Promise的作用就是主線程能夠控制一下執行的任務。

  setSuccess():標記任務成功,並觸發所有listener。如果任務早就成功或失敗,則拋出異常

  trySuccess():同上,但是失敗只是返回false,而不是拋出異常

 這裏就解釋這兩個方法,其他方法是覆蓋了父接口的方法,確定返回的具體類型而已。根據方法我們也能大體明白可寫的含義了。

3 主要Future詳解

3.1 DefaultChannelPromise

 該類是在註冊channel時創建的,SingleThreadEventLoop的register方法。和Java的FutureTask不同,FutureTask是作為一個任務交給線程池,在內部控制任務執行。DefaultChannelPromise則是持有了Channel和EventExecutor二者,在外邊處理邏輯。其上層有2個抽象父類:

  1.AbstractFuture:

    該類就實現了get方法,原理是調用了await()方法,await之後喚醒肯定就是任務結束了,判斷有無異常,最終返回結果還是拋出異常。

  2.DefaultPromise:

    該類提供了一個Promise應該具備的基本實現。對任務標記結果,觸發listener等。其主要有個result,對結果進行CAS操作來判斷任務是否完成。

    setSuccess過程如下:1.設置成功的結果;2.觸發所有的listener。設置結果主要是CAS更新result字段,然後判斷是否有get請求等待任務執行完,直接notifyAll即可。觸發listener的過程在於先判斷當前線程是否是事件線程中,觸發方法必須由EventLoop線程執行,然後就是遍歷觸發listener的operationComplete方法。

    await過程如下:1.判斷是否執行完,執行完直接返回;2.判斷線程是否中斷,中斷拋出異常;3.檢查死鎖,即wait操作不能在EventLoop的線程中執行;4.如果沒執行完,等待者計數,然後wait。

 DefaultChannelPromise相對於DefaultPromise而言只是增加了一個channel字段,其它的方法都是調用父類方法。接下來我們看看register過程是怎麽使用這個Promise的吧。

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

 可以看到在註冊過程中實際上就是使用setXXX方法來處理相關邏輯的,這個和Java的FutureTask采取了不同的方式。

3.2 SucceededFuture

 上面講了一個Promise控制主線程和線程池的同步狀態,那個是依靠promise才有的setXXX接口來觸發的。那麽Future是怎麽控制的呢?答案是Future不需要控制,返回Future的時候就已經有結果了,並且返回一定是一個同步過程。

 以SucceededFuture為例。Bootstrap中的doResolveAndConnect0方法有段:final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);其解析成功就會返回帶有結果的SucceededFuture。看這個類的sync方法也和Promise的不一樣,Promise是await,Future是直接返回。這個可以說明Future和Promise的區別:Future用於同步任務,Promise用於異步任務。不知道Netty為什麽會設計成這樣,讓人會有些疑惑。但是記住這點,再加上Promise的set方法達成的效果,就可以理解Netty的Future了。

4.總結

 Netty的Future設計采取了和Java的FutureTask不同的設計思路。Java的思路是將Futuren包裝成一個任務,這樣異步線程執行這個FutureTask的時候,其就可以知道任務的執行狀態。Netty將Future擴展成了Promise。Future作為同步方法直接返回的結果類,使用較少。Promise提供了setXXX方法,給異步線程調用該方法告知執行狀態。相同的地方在於Promise也必須被異步線程持有,才能使用set方法。

Netty核心概念(9)之Future