Netty原始碼分析之ChannelPipeline
每個channel內部都會持有一個ChannelPipeline物件pipeline. pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。
當channel完成register、active、read等操作時,會觸發pipeline的相應方法。
1、當channel註冊到selector時,觸發pipeline的fireChannelRegistered方法。
2、當channel的socket繫結完成時,觸發pipeline的fireChannelActive方法。
3、當有客戶端請求時,觸發pipeline的fireChannelRead方法。
4、當本次客戶端請求,pipeline執行完fireChannelRead,觸發pipeline的fireChannelReadComplete方法。
接下去看看pipeline是如何組織並執行handler對應的方法。
DefaultChannelPipeline
其中DefaultChannelHandlerContext儲存了當前handler的上下文,如channel、pipeline等資訊,預設實現了head和tail。
-
class DefaultChannelPipeline implements ChannelPipeline {
-
final Channel channel; // pipeline所屬的channel
-
//head和tail都是handler上下文
-
final DefaultChannelHandlerContext head;
-
final DefaultChannelHandlerContext tail;
-
...
-
public DefaultChannelPipeline(AbstractChannel channel) {
-
if (channel == null) {
-
throw new NullPointerException("channel");
-
}
-
this.channel = channel;
-
-
tail = new TailContext(this);
-
head = new HeadContext(this);
-
-
head.next = tail;
-
tail.prev = head;
-
}
-
}
1、TailContext實現了ChannelOutboundHandler介面。
2、HeadContext實現了ChannelInboundHandler介面。
3、head和tail形成了一個連結串列。
對於Inbound的操作,當channel註冊到selector時,觸發pipeline的fireChannelRegistered,從head開始遍歷,找到實現了ChannelInboundHandler介面的handler,並執行其fireChannelRegistered方法。
-
public ChannelPipeline fireChannelRegistered() {
-
head.fireChannelRegistered();
-
return this;
-
}
-
-
public ChannelHandlerContext fireChannelRegistered() {
-
final DefaultChannelHandlerContext next = findContextInbound();
-
EventExecutor executor = next.executor();
-
if (executor.inEventLoop()) {
-
next.invokeChannelRegistered();
-
} else {
-
executor.execute(new Runnable() {
-
public void run() {
-
next.invokeChannelRegistered();
-
}
-
});
-
}
-
return this;
-
}
-
-
private DefaultChannelHandlerContext findContextInbound() {
-
DefaultChannelHandlerContext ctx = this;
-
do {
-
ctx = ctx.next;
-
} while (!(ctx.handler() instanceof ChannelInboundHandler));
-
return ctx;
-
}
-
-
private void invokeChannelRegistered() {
-
try {
-
((ChannelInboundHandler) handler()).channelRegistered(this);
-
} catch (Throwable t) {
-
notifyHandlerException(t);
-
}
-
}
假如我們通過pipeline的addLast方法新增一個inboundHandler實現。
-
public class ClientHandler extends ChannelInboundHandlerAdapter {
-
public void channelRegistered(ChannelHandlerContext ctx)
-
throws Exception {
-
super.channelRegistered(ctx);
-
System.out.println(" ClientHandler registered channel ");
-
}
-
}
當channel註冊完成時會觸發pipeline的channelRegistered方法,從head開始遍歷,找到ClientHandler,並執行channelRegistered方法。
對於Outbound的操作,則從tail向前遍歷,找到實現ChannelOutboundHandler介面的handler,具體實現和Inbound一樣。
服務啟動過程中,ServerBootstrap在init方法中,會給ServerSocketChannel的pipeline新增ChannelInitializer物件,其中ChannelInitializer繼承ChannelInboundHandlerAdapter,並實現了ChannelInboundHandler介面,所以當ServerSocketChannel註冊到selector之後,會觸發其channelRegistered方法。
-
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-
initChannel((C) ctx.channel());
-
ctx.pipeline().remove(this);
-
ctx.fireChannelRegistered();
-
}
-
-
public void initChannel(Channel ch) throws Exception {
-
ChannelPipeline pipeline = ch.pipeline();
-
ChannelHandler handler = handler();
-
if (handler != null) {
-
pipeline.addLast(handler);
-
}
-
pipeline.addLast(new ServerBootstrapAcceptor(
-
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
-
}
在initChannel實現中,新增ServerBootstrapAcceptor例項到pipeline中。
ServerBootstrapAcceptor繼承自ChannelInboundHandlerAdapter,負責把接收到的客戶端socketChannel註冊到childGroup中,由childGroup中的eventLoop負責資料處理。
-
public void channelRead(ChannelHandlerContext ctx, Object msg) {
-
final Channel child = (Channel) msg;
-
-
child.pipeline().addLast(childHandler);
-
-
for (Entry<ChannelOption<?>, Object> e: childOptions) {
-
try {
-
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
-
logger.warn("Unknown channel option: " + e);
-
}
-
} catch (Throwable t) {
-
logger.warn("Failed to set a channel option: " + child, t);
-
}
-
}
-
-
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
-
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
-
}
-
-
try {
-
childGroup.register(child).addListener(new ChannelFutureListener() {
-
public void operationComplete(ChannelFuture future) throws Exception {
-
if (!future.isSuccess()) {
-
forceClose(child, future.cause());
-
}
-
}
-
});
-
} catch (Throwable t) {
-
forceClose(child, t);
-
}
-
}