Netty原始碼分析之流水線
上一篇分析Netty執行緒模型,今天分析Netty另外一個重點流水線Pipe。
一、流水線處理邏輯
Netty把各個事件放到Pipe中,進行自動化處理,這個做法非常棒!!思想非常獨特,使用者不需要關心是哪個事件?怎麼處理?使用者只需要把ChannelHandler設定到Pipe中就行了。下圖是Netty流水線經典處理流程:
左側是接收到的訊息處理流程,右側是傳送的訊息處理流程。Netty流水線實現方式很簡單,就是雙向連結串列(非迴圈連結串列)。它把所有的ChannelHandler放到連結串列中,如果是read事件,它從連結串列頭開始處理,如果是write事件,它從連結串列尾開處理。
二、流水線
流水線Pipe不需要使用者建立,這部分對使用者來說,完全是透明的。那麼在什麼地方建立的呢?
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
this.parent = parent;
this.eventLoop = validate(eventLoop);
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
由上面程式碼可知,流水線是在建立Channel的時候,new出來的Pipe。下面先看兩個函式,這個兩個函式邏輯比較繞,首先是
private static int skipFlags0(Class<? extends ChannelHandler> handlerType) { int flags = 0; try { if (handlerType.getMethod( "handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { flags |= MASK_HANDLER_ADDED; } if (handlerType.getMethod( "handlerRemoved", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { flags |= MASK_HANDLER_REMOVED; } ... } catch (Exception e) { // Should never reach here. PlatformDependent.throwException(e); } return flags; }
這個裡面大部分都是if判斷,主要檢查某個方法是否存在,並且是否設定了skip(註解),才會設定flags bit位置。我們從反方向考慮:ChannelHandlerAdapter父類這些方法都已經實現並且都存在skip註解,那麼要是想某個方法不進入這個if分支,該如何做呢?
1)繼承ChannelHandlerAdapter
2)複寫父類中某個方法並且不要設定skip標誌。
第二個方法,是findContextInbound/findContextOutbound
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
next.invoker.invokeChannelRegistered(next);
return this;
}
private DefaultChannelHandlerContext findContextInbound(int mask) {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.skipFlags & mask) != 0);
return ctx;
}
最繞人的邏輯就是 (ctx.skipflags & mask) != 0。
假設mask是MASK_CHANNEL_REGISTERED,如果skipFlags中mask則&運算是非0,那麼while迴圈會繼續,會取下一個上下文。
從方法名findContextInbound可知,是要根據mask查詢上下文。如果根據mask要想查詢某個上下文,需要怎麼做呢?
1) 繼承ChannelHandlerAdapter
2) 複寫父類中某個方法並且不要設定skip標誌。
這段邏輯不好理解,可能描述也沒有描述太清楚,希望網友自己在去理會啊!!說了這麼多flags,它到到底有什麼作用呢?它的最大作用是用於區別InBound/OutBound。不知道大家有沒有疑慮,通過介面channel.pipeline().addLast(...),設定流水線,那麼netty是否怎麼知道,這個handler是用於序列化?還是反序列化呢?其實就是用flags判斷。這個也是netty5.0和netty4.0最大的區別。
三、流水線動態新增
Netty流水線中ChannelHandler是支援動態插入、刪除的。這個特點在《Netty權威指南》中也有提到。接下來分析一下動態插入,首先看一下程式碼。
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
// 將監聽事件 註冊到ChannelPipe流水線中 放到連結串列中 也可以註冊多個監聽事件 可以指定名字如果沒有名字 會自動生成
ch.pipeline().addLast("GetTime", new TimeServerHandler());
}
}
由上面程式碼可知,在啟動服務的時候,設定了一個內部類ChildChannelHandler。該類主要用於將解碼器,新增到流水線中。最開始我認為,服務啟動之後,initChannel方法就會呼叫,其實不然,initChannel方法是在客戶端與服務建鏈成功之後才會被呼叫到。那麼是在具體什麼程式碼中呼叫到呢?
我們在上一篇介紹執行緒模型時提到了register0方法,在該方法中有一行這樣的程式碼pipeline.fireChannelRegistered(),然後呼叫下面幾個方法:
@Override
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
next.invoker.invokeChannelRegistered(next);
return this;
}
ChannelInitializer.java中的
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
在這個方法中有initChannel((C)ctx.channel());這段程式碼就會呼叫到上面initChannel方法,然後在流水線中動態新增解碼器。
通過上面程式碼,有一個地方需要再詳細介紹一下,在查詢ChannelHandleContext上下文,是通過MASK_CHANNEL_REGISTERED,那麼在什麼地方設定的這個標誌呢?
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
// 將監聽事件 註冊到ChannelPipe流水線中 放到連結串列中 也可以註冊多個監聽事件 可以指定名字如果沒有名字 會自動生成
ch.pipeline().addLast("GetTime", new TimeServerHandler());
}
}
在設定childHandler時候定義了一個內部私有類,這個類繼承了ChannelInitializer。父類ChannelInitializer覆寫了方法channelRegistered,該方法就是MASK_CHANNEL_REGISTERED。
至此,分析Netty的流水線已經完成,有分析不到的位請大家多多指點。