1. 程式人生 > >Netty服務端接收的新連線是如何繫結到worker執行緒池的?

Netty服務端接收的新連線是如何繫結到worker執行緒池的?

 

更多技術分享可關注我

前言

原文:Netty服務端接收的新連線是如何繫結到worker執行緒池的?

前面分析Netty服務端檢測新連線的過程提到了NioServerSocketChannel讀完新連線後會迴圈呼叫服務端Channel繫結的pipeline.fireChannelRead()方法,將每條新連線打包當做引數傳入,然後通過這個方法將其沿著服務端Channel的pipeline傳遞下去,即在Channel的handler鏈條上流動,這部分細節後續會詳細分解。 

下面看下,新連線在服務端Channel的pipeline的流動過程中,Netty配置的boss執行緒池和worker執行緒池是如何配合的。

伺服器的新連線接入器原始碼分析

簡單回顧前面文章:Netty是如何處理新連線接入事件的?中分析了Netty服務端檢測新連線的過程,回憶NioMessageUnsafe類的read()方法原始碼:

看最後的紅色方框,是在迴圈中將新連線順著Channel的pipeline傳遞下去,NioMessageUnsafe是前面說的Netty的Channel的內部介面——Unsafe的服務端的實現類。

那麼這些新連線後續被傳遞時會發生什麼呢?這也是重點問題——即Netty客戶端新連線的Channel被封裝後,如何與Netty的I/O執行緒關聯。下面看之前提到的新連線接入器,關聯的功能主要是這個接入器實現。

言歸正傳看ServerBootstrapAcceptor原始碼,它是一個內部類,繼承了ChannelInboundHandlerAdapter(後面詳解Netty的pipeline機制)。

現在先複習一下服務端啟動流程。服務端啟動的核心操作是繫結埠,即在使用者程式碼中serverBootstrap.bind(xx);方法中啟動,裡面會呼叫ServerBootstrap的doBind方法,在doBind方法裡呼叫了ServerBootstrap的initAndRegister()方法,這是一個初始化服務端Channel並註冊I/O多路複用器的方法,如下圖:

該方法通過反射建立了服務端的NioServerSocketChannel,並且建立儲存了JDK的ServerSocketChannel以及一些元件,比如pipeline等,接著執行Channel的初始化操作——即ServerBootstrap的init(channel)方法(分析的是服務端程式碼,故只看ServerBootstrap類對init的實現),init方法裡就有新連線接入器的建立邏輯。如下紅框處,在init裡配置服務端的pipeline時,預設添加了一個ServerBootstrapAcceptor handler:

先捋一捋完整過程:

1、首先ServerBootstrap的init方法為服務端Channel的pipeline添加了一個ChannelInitializer,在該類實現的void initChannel(Channel ch)方法裡先將使用者程式碼裡配置的服務端的handler新增,前面我也說過,這個服務端的handler配置一般很少用到(即.handler() API),常用的主要是給客戶端配置handler,即.childHandler()

2、然後非同步的新增一個新連線接入器——ServerBootstrapAcceptor,具體的,是把新增ServerBootstrapAcceptor到pipeline的操作封裝為了一個task,委託給服務端的NIO執行緒非同步執行,等到有新連線到來時,該task已執行完畢。即Netty服務端Channel的pipeline最小結構如下:

這裡提前接觸Netty的入站事件和出站事件的概念,所謂入站事件——即inbound事件,即Netty的NIO執行緒主動發起的,是面向使用者業務handler的操作,即都是被動發起的事件,通過fireXXX方法傳播。

比如Channel連線成功,Channel關閉,Channel有資料可讀,Channel上註冊I/O多路複用器成功,Channel解除I/O多路複用器的註冊,異常丟擲等,這些都是被動執行的回撥事件,它們的處理有專門的handler實現,統一叫入站handler。反之還有出站事件和出站handler,出站事件——即outbound事件,都是使用者執行緒或者使用者程式碼主動發起的事件,如下是出站事件:

比如伺服器主動繫結埠,主動關閉連線,客戶端主動連線伺服器,伺服器(客戶端)主動寫出訊息等操作,這些事件的特點就是由使用者主動發起。針對這兩類事件,除了Netty預設提供的handler,使用者還可以自定義入站/出站handler以實現自己的攔截邏輯,這也是職責鏈(也叫責任鏈)模式的思想。

言歸正傳,繼續分析伺服器讀取新連線的過程,現在分析的是新連線接入,故只看入站handler。先知道入站事件流動的順序是從pipeline的頭部節點開始,途徑各個入站handler節點,一直流動到尾部節點結束,這裡就是Head->ServerBootstrapAcceptor->Tail。如下:

還得知道tail節點本質是一個入站handler,head節點本質是一個出站handler,後續會詳細拆解,這裡不知道為什麼也無所謂。

前面說到,NioMessageUnsafe類的read()方法,最後會將讀到的客戶端新連線傳遞出去,如下:

具體來說是觸發後續的各個入站handler的ChannelRead事件(前面說了ChannelRead是一個入站事件),入站事件都是從pipeline的頭部節點——HeadContext開始傳播的,而觸發這個事件傳播的正是pipeline.fireChannelRead(xxx)方法。

還記得服務端啟動的時候,如下有一段程式碼:serverBootstrap.handler(new ServerHandler())serverBootstrap.childHandler(new ServerHandler());

當時給了這樣一個結論:.handler方法新增的handler是新增到服務端Channel的pipeline上,是在服務端初始化的時候就新增的,而.childHandler方法新增的handler是新增到客戶端Channel的pipeline上,是在處理新連線接入的時候新增的。現在知道原因了,ServerBootstrap呼叫init時,先pipeline.addLast(handler),然後新增一個ServerBootstrapAccepter,這樣服務端的pipeline也可能是head-hander>serverBootStrapAccepter>tail這種組成結構,如下(很熟悉的結構):

這裡一定要明白,兩個操作是分別把handler加到了服務端和客戶端的pipeline。

serverBootStrapAccepter本身也是一個入站的handler。根據前面的分析,入站事件的傳播順序是head->使用者定義的入站handler->ServerBootstrapAcceptor->tail,我的demo裡沒有為伺服器定義handler,故直接呼叫到ServerBootstrapAcceptor的channelRead方法,該方法是接入器的重點,需要重點學習,ServerBootstrapAcceptor的channelRead方法原始碼如下;

ServerBootstrapAcceptor是ServerBootstrap的一個內部類。下面看debug過程,一上來就把msg強轉為了Channel,即這裡接收到的msg變數本質是剛剛讀取到的客戶端新連線——被Netty封裝為了其自定義的Channel。後續的ServerBootstrapAcceptor主要做了三件事:

1、黃色1處,就是前面分析的,在接入器裡新增使用者配置的客戶端Channel的handler:即將使用者在伺服器程式碼裡通過.childHandler()自定義的ChannelHandler新增到客戶端的pipeline,後續詳解。

2、黃色2處,設定使用者配置的options和attrs,主要是設定客戶端Channel的childOptions和childAttrs,childOptions是channel底層為TCP協議配置的屬性,childAttrs是channel本身的一些屬性,它的本質是個map,比如可以儲存當前channel存活時間,金鑰等。

3、黃色3處,選擇worker執行緒池的一根NIO執行緒,並將其繫結到該客戶端Channel——即程式碼裡的child變數。這步是非同步操作,並通過register方法實現,這個方法複用了服務端啟動時為服務端Channel註冊I/O多路複用器的程式碼邏輯。這最後一步又分為兩小步:

  • worker執行緒池通過EventLoop的執行緒選擇器——Chooser的next()方法選擇一個NioEventLoop執行緒和新連線繫結,和服務端執行緒池一樣的邏輯

  • 註冊客戶端的新Channel到這個NioEventLoop的I/O多路複用器,併為其註冊OP_READ事件

下面詳細分析這兩小步,我通過debug跟進register,來到了MultithreadEventLoopGroup的register方法,如下原始碼:

最後進入到父類io/netty/util/concurrent/MultithreadEventExecutorGroup類,看到這裡就很熟悉了,會進入到前面分析過的NioEventLoopGroup的執行緒選擇器。

這裡使用的優化方法——通過位運算選擇一個NioEventLoop執行緒。如下發現idx是0,即workerGroup執行緒池裡的執行緒此時才剛剛選擇第一個,因為這是我當前執行的伺服器接收到的第一條客戶端連線,所以後續再來新連線時,會順次啟動後續的執行緒與之繫結,如果繫結到最後一根,那麼idx會重新從0開始,迴圈往復。。。注意此時NIO執行緒還沒有啟動。Netty做了優化,前面也說了,Netty的執行緒池都是延遲啟動的。

在MultithreadEventLoopGroup類的register方法裡選擇NioEventLoop執行緒後,next()方法會返回一個NioEventLoop例項,然後繼續呼叫該例項的register方法,即下一步過會跳轉到NioEventLoop直接父類SingleThreadEventLoop的register方法,如下原始碼:

呼叫到了第二個register方法裡,裡面的channel()方法返回的就是客戶端的NioSocketChannel,unsafe()方法就是NioByteUnsafe例項,即最後呼叫了客戶端channel的Unsafe的register方法。即AbstractChannel的內部類——AbstractUnsafe的register方法,原始碼如下:

看到這個方法的程式碼就應該很熟悉了,我在前面Netty服務端啟動的時候分析過,即給客戶端新連線註冊I/O多路複用器的邏輯複用了這一套程式碼,這也得益於Netty良好的架構設計。

下面再分析一下,執行AbstractUnsafe的register方法的邏輯:

1、首先對當前客戶端的I/O執行緒以及Channel做校驗,然後在黃色1處,判斷當前執行緒是不是NIO執行緒,顯然這裡是false,因為雖然此時已經選擇了一個客戶端NIO執行緒,但是該NIO執行緒還沒有啟動,整個註冊邏輯還是執行在使用者執行緒下,我的demo是main執行緒,如下佐證,故1這裡判斷失敗,接下來執行else裡的程式碼,將真正的註冊邏輯委託給剛剛啟動的客戶端的NIO執行緒非同步執行,這樣做也能保證執行緒安全。

2、看黃色2處,即else程式碼裡,會通過NioEventLoop的execute方法啟動之前選擇的NIO執行緒(當然,如果已經啟動了,那麼會略過啟動步驟),同時驅動註冊的這個task,這裡才真正啟動NIO執行緒,也能佐證Netty的執行緒池實現了延遲啟動,

3、最後看黃色3處,我進入到這個register0方法,看它的實現原始碼,如下:

最關鍵的方法是其中的doRegister()方法,看紅色方框處。我進入該方法,發現其實現在了子類AbstractNioChannel裡。這就非常熟悉了,還是和服務端註冊ServerSocketChannel流程一樣,如下:

正是Netty封裝的JDK註冊Channel的Selector的邏輯。在該方法裡將客戶端Channel註冊到客戶端NioEventLoop執行緒的I/O多路複用器,並將NioSocketChannel物件附加到JDK Channel,不過此時註冊的感興趣的I/O事件還是0,即什麼都不關注,即該客戶端Channel還處於初始化狀態,真正註冊I/O事件還在後面流程裡。

注意該方法將註冊邏輯寫在了一個死迴圈裡,學會這種用法,目的是為了保證一個事情必須完成,即使出現某些異常。

回到register0方法,再看一遍,註冊完成後,會先觸發處於掛起狀態的handlerAdded事件,即先執行黃色1處的程式碼,這裡對應了為該客戶端新連線新增使用者自定義的客戶端handler的邏輯。然後才執行黃色2處,觸發並傳播當前Channel已經註冊成功的事件。如果當前Channel依然存活,那麼會繼續執行3處的程式碼,即為首次註冊的新Channel傳播Channel成功連線(處於活躍狀態)的事件。

最後,如果當前Channel不是第一次註冊,那麼會判斷是否配置的自動讀訊息(Netty預設都是讀優先),如果是,那麼會執行黃色4處的程式碼,後續詳解。

小結

為新連線分配NIO執行緒和對新連線註冊I/O多路複用器的核心——是理解ServerBootstrapAcceptor,並由此知道服務端Channel的pipeline最小構成:Head->ServerBootstrapAcceptor->Tail

理解ServerBootstrapAcceptor:

1.延遲新增childHandler——將自定義ChannelHandler新增到新連線的pipeline,必須等當前Channel註冊I/O多路複用器完畢後,才會新增

2.設定options和attrs——設定childOptions和childAttrs

3.選擇NioEventLoop並註冊到Selector,核心是呼叫worker執行緒池的Chooser的next()方法選擇一個NioEventLoop,通過其doRegister()方法,將新連線註冊到worker執行緒繫結的Selector上。這裡的新連線和Selector是多對一的關係。

歡迎關注

dashuai的部落格是終身學習踐行者,大廠程式設計師,且專注於工作經驗、學習筆記的分享和日常吐槽,包括但不限於網際網路行業,附帶分享一些PDF電子書,資料,幫忙內推,歡迎拍磚!

相關推薦

Netty服務接收連線是如何worker執行的?

  更多技術分享可關注我 前言 原文:Netty服務端接收的新連線是如何繫結到worker執行緒池的? 前面分析Netty服務端檢測新連線的過程提到了NioServerSocketChannel讀完新連線後會迴圈呼叫服務端Channel繫結的pipeline.fireChannelRead()方

遊戲Call技術-執行呼叫CALL技術(反遊戲檢測call)

我們在寫call呼叫遊戲程序裡call時候,經常外掛執行工作一段時間後,遊戲就會斷線或崩潰掉,但是經過檢查, 發現自己呼叫CALL的程式碼又沒發現寫錯誤,這到底是怎麼呢?{:100_162:} 其實這些是現在遊戲的一些遊戲反外掛檢測導致的,並不是我們呼叫call的程式碼沒寫對, 現在的

spring使用ThreadLocal將資源和事務執行

這篇文章想要解釋Spring為什麼會選擇使用ThreadLocal將資源和事務繫結到執行緒上,這背後有著什麼樣的起因和設計動機,通過分析幫助大家更清晰地認識Spring的執行緒繫結機制。 “原始”的資料訪問寫法 訪問任何帶有事務特性的資源系統,像資料庫,都

併發特性—Executor 框架與執行

蘭亭風雨 · 更新於 2018-11-14 09:00:31 併發新特性—Executor 框架與執行緒池 Executor 框架簡介 在 Java 5 之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor 框架便是 Java 5 中引入的,其內部使用了執行緒池機

併發特性—Executor框架與執行(含程式碼)

Executor框架簡介 在Java 5之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor框架便是Java 5中引入的,其內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行

JAVA SOCKET網路程式設計,服務接收多個客戶連線的實現

這是基於前面一篇文章的一個簡單擴充套件,當然這不是最好的實現 服務端如果要想接收多個客戶端連線,必須死迴圈去接收新的客戶端的連線請求,一個完整的客戶端服務端socket連線通過一個執行緒來維護 package com.tree.demo.socket; import

安卓使用Socket發送中文,C語言服務接收亂碼問題解決方式

article nbsp ons size ret con pre n+1 utf8 今天用安卓通過Socket發送數據到電腦上使用C語言寫的服務端,發送英文沒有問題,可當把數據改變成中文時,服務端接收到的數據確是亂碼。 突然想到。VS的預處理使用的

HttpURLConnection 發送PUT請求 json請求體 與服務接收

logs esp appid 請求 edi write webtest read tco public void testHttp() { String result = ""; try {

java在線聊天項目0.4版本 制作服務接收連接,客戶連接功能 新增客戶窗口打開時光標指向下邊文本域功能,使用WindowListener監聽WindowAdapter

內部 frame visible [] one exit eve awt dap 建一個服務端類ChatServer,用於設置端口接收連接 package com.swift; import java.io.IOException; import java.net.Se

java在線聊天項目0.9版 實現把服務接收到的信息返回給每一個客戶窗口中顯示功能之客戶接收

nec 一個 out for tex ava 添加 implement com 客戶端要不斷接收服務端發來的信息 與服務端不斷接收客戶端發來信息相同,使用線程的方法,在線程中循環接收 客戶端修改後代碼如下: package com.swift; import java.

netty服務實現心跳超時的主動拆鏈

use ctx out apt ket cond else pipeline ali 一、服務器啟動示例: public class MySocketServer { protected static Logger logger = LoggerFactory.g

使用axis呼叫webservice時,服務接收到的引數為null

通過axis呼叫,需要注意兩點: 1)在call.setOperationName是必須通過Qname來制定namespaceURI 2)在設定引數時,不使用服務端定義的引數名,而是arg0~argN來定義,也不需制定namespaceURI,上述程式碼 call.addParamete

將從服務接收到的資料轉成JSON資料

第一步,從服務端接收資料,並將資料轉成int陣列。 try{ // 建立輸入流物件InputStream is = socket.getInputStream(); int length = is.a

Dubbo原始碼解析之服務接收訊息

準備 dubbo 版本:2.5.4 服務端接收訊息流程 Handler鏈路 DubboProtocol private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent("c

WPFS資料(要是後臺類物件的屬性值發生改變,通知在“客戶介面與之的控制元件值”也發生改變需要實現INotitypropertyChanged介面)

WPFS資料繫結(要是後臺類物件的屬性值發生改變,通知在“客戶端介面與之繫結的控制元件值”也發生改變需要實現INotitypropertyChanged介面) MainWindow.xaml 1 <Window x:Class="WpfApplication1.MainWindow" 2

mina保持android\服務的長連線

更多幹貨 一.mina簡介 Apache Mina是一個能夠幫助使用者開發高效能和高伸縮性網路應用程式的框架。與Netty出自同一人之手,都是一個介於應用程式與網路之間的NIO框架,通過Java nio技術基於TCP/IP和UDP/IP協議提供了抽象的、事件驅動的、非同步

netty原始碼分析之連線接入全解析

本文收穫 通讀本文,你會了解到 1.netty如何接受新的請求 2.netty如何給新請求分配reactor執行緒 3.netty如何給每個新連線增加ChannelHandler 其實,遠不止這些~ 前序背景 讀這篇文章之前,最好掌握一些前序知識,包括netty中的r

DataOutputStream使用writeBytes寫入中文資料時服務接收到的是亂碼

在開發過程中發現使用HttpURLConnection傳送POST請求,需要同時上傳檔案和其他引數,引數中如果有中文字元,DataOutputStream使用writeBytes,將中文字元傳入的話,服務端接收到的字串會亂碼,但是使用write就不會,故截取了一部分程式碼進行

解決.NET Web API生成的Help無Controller說明&服務接收不到請求

今天在用.NET Web API寫一個介面的時候遇到一個問題。在Controller中新加了一個方法,客戶端就不能請求介面了,當時建WEB API專案是用的VS預設設定,在服務端打斷點一直沒有進去,而APP端一直報服務端響應失敗!奇怪的是連生成的Help API說明都沒Controller說明。

JAX-RS服務接收中文亂碼

總是在追求最新、最牛的技術然而卻忘本了,在此記錄一下: 還記得在學servlet的時候,處理中文亂碼時:如果是GET提交則用byte[] nameByte = str.getBytes("ISO-88