死磕Netty原始碼之新連線接入原始碼解析
前言
本部落格主要是介紹Netty在新連線接入後的相關處理
新連線建立
新連線建立可以分為以下三個步驟
1.檢測到有新的連線
2.將新的連線註冊到Worker執行緒組
3.註冊新連線的讀事件
在Reactor執行緒模型詳解部落格中我們已經知道當服務端讀取到IO事件(新連線接入事件)後,會呼叫processSelectedKey方法對事件進行處理,此處以新連線接入事件為例它最後會呼叫底層的unsafe進行read操作
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe ().recvBufAllocHandle();
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
}
這裡有兩個主要的方法:
1.doReadMessages
2.pipeline.fireChannelRead
doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// ...
}
return 0;
}
該方法主要作用是通過JDK底層的API獲取到SocketChannel,然後包裝成Netty自己的NioSocketChannel。NioSocketChannel與服務端啟動時建立的NioServerSocketChannel最主要的區別在於它們關注的事件不同,NioSocketChannel的構造方法如下
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
這裡我們看到一個SelectionKey.OP_READ,說明這個Channel關心讀事件而服務端的Channel關心ACCEPT事件。接下來呼叫父類AbstractNioChannel構造,後續過程與服務端啟動流程一致此處不再贅述
pipeline.fireChannelRead
接著來看pipeline.fireChannelRead(readBuf.get(i))方法,關於Pipeline我們將在下一篇部落格中詳細介紹。我們知道客戶端在啟動的過程中會往Pipeline中新增一個ServerBootstrapAcceptor(連線處理器的東西),所以到這裡服務端Channel對應的Pipeline的資料結構為:Hea⇋ServerBootstrapAcceptor⇋Tail,在呼叫pipeline.fireChannelRead時會依次觸發這三個節點上的channelRead方法,接下來我們重點關注ServerBootstrapAcceptor的channelRead方法,程式碼如下
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
首先獲取我們之前例項化的NioSocketChannel,然後將我們設定的chlidHandler新增到NioSocketChannel對應的Pipeline中(這裡的chlidHandler對應使用者通過.childHandler()設定的Handler),程式碼執行到這裡NioSocketChannel中Pipeline對應的資料結構為: head⇋ChannelInitializer⇋tail,接著設定NioSocketChannel對應的attr和option,然後進入到childGroup.register(child),這裡的childGroup就是WorkerGroup,接下來我們進入NioEventLoopGroup的register方法
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
這段程式碼和服務端啟動的時候像BossGroup註冊NioServerSocketChannel是類似的,通過next()方法獲取到NioEventLoop然後將Channel註冊到該NioEventLoop上(即將該Channel與NioEventLoop上的Selector進行繫結)。註冊的邏輯最終是交給unsafe物件完成的,我們繼續跟進unsafe的register方法,程式碼如下
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//...
}
}
}
由於這裡是在Boss執行緒中執行的IO操作所以不會是跟Worker執行緒是同一個執行緒,所以eventLoop.inEventLoop()返回false,最後會通過eventLoop.execute的方式去執行註冊任務。在Reactor執行緒模型中我們講到在呼叫execute的時候,如果是首次新增任務那這個NioEventLoop執行緒會被啟動,所以從此Worker執行緒開始執行,接下來看下具體的註冊邏輯
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
//...
}
}
和服務端啟動過程一樣,先是呼叫doRegister()執行真正的註冊過程
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
//...
}
}
}
該Channel繫結到NioEventLoop對應的Selector上去,後續該Channel的事件輪詢、事件處理、非同步Task執行都由此執行緒負責,繫結完Reactor執行緒之後呼叫pipeline.invokeHandlerAddedIfNeeded()程式碼如下
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
往下跟callHandlerAddedForAllHandlers方法
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
這裡有個物件叫pendingHandlerCallbackHead,我們發現它是在callHandlerCallbackLater方法中被初始化的
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
當我們在Channel註冊到之前新增或刪除Handler時,此時沒有EventExecutor可執行HandlerAdd或HandlerRemove事件,所以Netty為此事件生成一個相應任務等註冊完成後在呼叫執行任務。新增或刪除任務可能有很多個,DefaultChannelPipeline使用一個連結串列儲存,連結串列頭部為先前的欄位pendingHandlerCallbackHead
接下來我們繼續分析task.execute方法, 它主要是完成NioSocketChannel對應的Pipeline的初始化
void execute() {
// ...
callHandlerAdded0(ctx);
// ...
}
通過上面對pendingHandlerCallbackHead的分析,這裡肯定會呼叫ChannelInitializer的handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
ChannelInitializer的initChannel主要完成兩個功能以下兩個功能
首先呼叫initChannel((C) ctx.channel())進入使用者自定義的程式碼完成Pipeline的初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
})
然後在finally中呼叫remove方法將ChannelInitializer刪除
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
執行該方法前NioSocketChannel對應的Pipeline的資料結構為:head⇋ChannelInitializer⇋tail,執行該方法後ChannelInitializer被刪除,NioSocketChannel對應的Pipeline的資料結構為:head⇋自定義的HandlerContext⇋tail
到目前為止我們完成了新連線的註冊、pipeline的繫結,但是新連線註冊的時候的感興趣事件還是0還無法進行讀寫操作,新連線對讀事件的繫結是在pipeline.fireChannelActive方法中完成的,它最後會呼叫到AbstractNioChannel的doBeginRead
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
前面register0()方法的時候向selector註冊的事件程式碼是0,而readInterestOp對應的事件程式碼是SelectionKey.OP_READ,所以本段程式碼的用處是將SelectionKey.OP_READ事件註冊到Selector中去,fireChannelActive的執行邏輯在服務端啟動過程中有詳細描述,至此已完成客戶端新連線接入的操作。下一篇部落格將介紹Pipeline相關的原始碼解析