Netty 接受請求過程原始碼分析 (基於4.1.23)
前言
在前文中,我們分析了伺服器是如何啟動的。而伺服器啟動後肯定是要接受客戶端請求並返回客戶端想要的資訊的,否則要你伺服器幹啥子呢?所以,我們今天就分析分析 Netty 在啟動之後是如何接受客戶端請求的。
開始吧!
1. 從源頭開始
從之前伺服器啟動的原始碼中,我們得知,伺服器最終註冊了一個 Accept 事件等待客戶端的連線。我們也知道,NioServerSocketChannel 將自己註冊到了 boss 單例執行緒池(reactor 執行緒)上,也就是 EventLoop 。
樓主還沒有仔細介紹 EventLoop ,但樓主這裡先稍微講一下他的邏輯:
EventLoop 的作用是一個死迴圈,而這個迴圈中做3件事情:
- 有條件的等待 Nio 事件。
- 處理 Nio 事件。
- 處理訊息佇列中的任務。
而我們今天看的就是第二個步驟。
首先需要進入到 NioEventLoop 原始碼中。
2. 開始 debug
進入到 NioEventLoop 原始碼中後,找到 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法 ,斷點打在下方:
debug 啟動我們的 EchoServer 的 main 方法。在瀏覽器鍵入 http://localhost:8007/,開始訪問我們的 Netty 伺服器,這時候,斷點開始卡住。
從上圖中的斷點我們可以看到, readyOps 是16 ,也就是 Accept 事件。說明瀏覽器的請求已經進來了。那麼這個 unsafe 是誰呢?就是 boss 執行緒中 NioServerSocketChannel 的AbstractNioMessageChannel$NioMessageUnsafe 物件。
AbstractNioMessageChannel$NioMessageUnsafe # read 方法
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
樓主限於篇幅,精簡了很多程式碼,我們拆解一下程式碼:
- 檢查該 eventloop 執行緒是否是當前執行緒。
- 執行 doReadMessages 方法,並傳入一個 readBuf 變數,這個變數是一個 List,也就是容器。
- 迴圈容器,執行 pipeline.fireChannelRead(readBuf.get(i));
我們分析一下上面的步驟:doReadMessages 肯定是讀取 boss 執行緒中的 NioServerSocketChannel 接受到的請求。並把這些請求放進容器,然後呢?迴圈容器中的所有請求,呼叫 pipeline 的 fireChannelRead 方法,用於處理這些接受的請求或者其他事件。
那麼我們就來驗證一下。進入 doReadMessages 方法。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
樓主精簡了程式碼,可以看到,和我們猜的不差,該方法很簡單,通過工具類,呼叫 NioServerSocketChannel 內部封裝的 serverSocketChannel 的 accept 方法,熟悉的 Nio 做法。然後獲取到一個 JDK 的 SocketChannel,然後,使用 NioSocketChannel 進行封裝。最後新增到容器中。
3. NioSocketChannel 是如何建立的?
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
我們另起一段研究這段程式碼,先看 SocketUtils.accept(javaChannel());
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
該方法呼叫了 NioServerSocketChannel 中的 serverSocketChannel.accept() 方法。返回了一個 Nio 的通道,注意:這個通道,就是我們剛剛 Boss 執行緒監聽到的 Accept 事件,相當於一個 Tcp 連線。
然後我們看 NioSocketChannel 的建立過程,其中引數 this 是 NioServerSocketChannel ,這個就是 SocketChannel 的 parent 屬性,ch 是 SocketChannel 。構造方法如下:
和 ServerSocket 類似,還記得 ServerSocket 是怎麼建立的嗎:
還是很相似的。
我們先略過 config 的建立過程,先看 super。
這裡設定了 SelectableChannel 屬性為 JDK 的 Nio 的 SocketChannel 和 感興趣的事件。設定非阻塞。
進入到 super 構造方法中:
也是和 ServerSocket 一樣了,注意:這裡的 unsafe 就和 ServerSocket 不同了,此方法被重寫了,返回的是
NioSocketChannel$NioSocketChannelUnsafe, 是 NioSocketChannel 的內部類。再看 pipeline ,是相同的 DefaultChannelPipeline。同樣 pipeline 也會自己建立自己的 head 節點和 tail 節點。
好了,到這裡,NioSocketChannel 就建立完畢了。
回到 最初的 read 方法中。
4. 迴圈執行 pipeline.fireChannelRead 方法
從上面我們可以看到,doReadMessages 方法的作用是通過 ServerSocket 的 accept 方法獲取到 Tcp 連線,然後封裝成 Netty 的 NioSocketChannel 物件。最後新增到 容器中。
然後再 read 方法中,迴圈呼叫 ServerSocket 的 pipeline 的 fireChannelRead 方法。從這個方法的名字可以感受到:開始執行 管道中的 handler 的 ChannelRead 方法。
那麼我們就看看:
到這裡,樓主就不一個一個 dubug 了,實際上,我們知道,pipeline 裡面又 4 個 handler ,分別是 Head,LoggingHandler,ServerBootstrapAcceptor,Tail。我們重點關注 ServerBootstrapAcceptor。debug 之後,斷點會進入到 ServerBootstrapAcceptor 中來。我們來看看 ServerBootstrapAcceptor 的 channelRead 方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {// 將客戶端連線註冊到 worker 執行緒池
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
我們講該方法拆解:
- msg 強轉成 Channel ,實際上就是 NioSocketChannel 。
- 新增 NioSocketChannel 的 pipeline 的 handler ,就是我們 main 方法裡面設定的 childHandler 方法裡的。
- 設定 NioSocketChannel 的各種屬性。
- 最重要的,將該 NioSocketChannel 註冊到 childGroup 中的一個 EventLoop 上,並新增一個監聽器。
我們重點看最後一步,這個 childGroup 就是我們 main 方法建立的陣列大小為 16 的 workerGroup。在建立 ServerBootstrapAcceptor 新增進來的。
進入 register 方法檢視:
這裡的 next 方法我們之前介紹過了,使用位運算獲取陣列中的EventLoop。
這裡建立 DefaultChannelPromise 我們之前也看過了,最後該方法返回的就是這個 DefaultChannelPromise。
這裡鏈式呼叫說明一下:
- premise 的 channel 方法返回的是 NioSocketChannel。
- promise.channel().unsafe() 返回的是 NioSocketChannel$NioSocketChannelUnsafe。
所以最終呼叫的是 NioSocketChannel 的內部類的 register 方法。引數是當前的 EventLoop 和 promise。
檢視這個 register 方法:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {// 開始真正的非同步,boss 執行緒開始啟動
@Override
public void run() {
register0(promise);
}
});
}
樓主精簡了一下程式碼邏輯,其實就是同步或者非同步的呼叫 register0 方法。大家可以向一下到底是非同步還是同步?應該是非同步的。應為此時的執行緒是 boss 執行緒,而不是 worder 執行緒,所以肯定無法通過 inEventLoop 判斷。
進入到非同步執行緒中檢視 register0 方法。其實和我們之前分析的註冊 ServerSocket 的過程是一樣的。
其中最核心的就是 doRegister 方法。
doRegister 方法
pipeline.invokeHandlerAddedIfNeeded() 方法
回到 register0 中,該方法在成功註冊到 selector 的讀事件後,繼續執行管道中可能存在的任務。那麼管道中會存在什麼任務呢?我們來看看:
到這裡,我們發出疑問,這個 task 從哪裡來的?
經過查詢,我們發現,這個 pendingHandlerCallbackHead 變數來自我們 addLast 的時候,如果該 pipeline 還沒有註冊到這個 eventLoop 上,則將這個包裝過 handler 的 context 放進變數 pendingHandlerCallbackHead 中,事實上,這個 pendingHandlerCallbackHead 就是個連結串列的表頭,後面的 Context 會被包裝成一個任務,追加到連結串列的尾部。
那麼這個 execute 方法如何執行呢?
主要是執行 callHandlerAdded0 方法,並且傳入這個 Context:
注意:這裡呼叫了包裝了自定義 handler 的 Context 的 handlerAdded 方法,並且傳入了這個 Context。然後這個方法我們並沒有重寫,我們看父類中方法邏輯:
完美,呼叫了 initChannel 方法。但注意,這裡呼叫的並不是我們重寫的 initChannel 方法,因為引數不是同一個型別,我們重寫的方法的引數是 SocketChannel,而不是ChannelHandlerContext,所以,肯定還需要再呼叫一層。
這裡才是呼叫使用者程式碼的地方。
我們的使用者程式碼添加了兩個處理器,還有一個自定義的處理器。當然,現在新增處理器不會再新增到那個 pendingHandlerCallbackHead 任務連結串列裡了,因為已經註冊過了,if 判斷過不了。
operationComplete 方法
然後設定promise 的 operationComplete 方法。還記得我們在ServerBootstrap 的 channelRead 方法中的程式碼嗎?
在這裡呼叫了我們之前的設定的監聽器的 operationComplete 方法。
pipeline.fireChannelRegistered() 方法
好,再然後呼叫 pipeline.fireChannelRegistered() 的方法。大家可以按照之前的 pipeline 的路子想一下,會如何執行?pipeline 作為管道,其中有我們設定的 handler 連結串列,這裡,肯定會順序執行我們的 handler,比如 main 方法中的 childerHandler。我們繼續驗證一下。
該方法會繼續呼叫橋樑 Context 的 fireChannelRegistered 方法,Context 包裝的就是我們自定義的 handler。當然我們沒有重寫該方法。我們只重寫了 initChannel 方法。
pipeline.fireChannelActive() 方法
回到 register0 方法中,我們繼續向下走,如果是第一次註冊的話,執行pipeline.fireChannelActive()程式碼,也就是執行 pipeline 管道中的 handler 的 ChannelActive 方法。
同樣,我們也沒有重寫該方法,父類會繼續回撥 fireChannelActive 方法。而這個方法裡會繼續尋找下一個 Context,然後繼續呼叫,直到遇到 pipeline 的 channelActive(ChannelHandlerContext ctx) 方法:
這裡有一行 readIfIsAutoRead 方法,我們注意一下,上面的 ChannelActive 方法都執行結束後,也就是已經連線已經成功後,便開始呼叫read方法。
同樣的,如果熟悉伺服器啟動過程的同學肯定看出來了,這裡最終會呼叫 doBeginRead 方法,也就是 AbstractNioChannel 類的方法。
在之前的 doRegister 方法中,只是註冊了0,為什麼呢?如果直接註冊1,也就是讀事件,但系統還沒有準備好讀取,現在一切都初始化了,就可以讀取了。而這裡是管道的 head 節點呼叫 unsafe 方法的。
到這裡,針對於這個客戶端的連線就完成了,接下來就可以監聽讀事件了。
總結:伺服器接受客戶端過程
- 伺服器輪詢 Accept 事件,獲取事件後呼叫 unsafe 的 read 方法,這個 unsafe 是 ServerSocket 的內部類,該方法內部由2部分組成。
- doReadMessages 用於建立 NioSocketChannel 物件,該物件包裝 JDK 的 Nio Channel 客戶端。該方法會像建立 ServerSocketChanel 類似建立相關的 pipeline , unsafe,config。
- 隨後執行 執行 pipeline.fireChannelRead 方法,並將自己繫結到一個 chooser 選擇器選擇的 workerGroup 中的一個 EventLoop。並且註冊一個0,表示註冊成功,但並沒有註冊讀(1)事件.
- 在註冊的同時,呼叫使用者程式中設定的 ChannelInitializer handler,向管道中新增一個自定義的處理器,隨後立即刪除這個 ChannelInitializer handler,為什麼呢?因為初始化好了,不再需要。
- 其中再呼叫管道的 channelActive 方法中,會將曾經註冊過的 Nio 事件改成讀事件,開始真正的讀監聽。到此完成所有客戶端連線的讀前準備。
總的來說就是:接受連線----->建立一個新的NioSocketChannel----------->註冊到一個 worker EventLoop 上--------> 註冊selecot Read 事件。
當然,關於,獲取到讀事件後該怎麼處理還沒有說,限於篇幅,留在下篇文章中。
good luck!!!!