1. 程式人生 > >Netty原始碼分析之ChannelPipeline

Netty原始碼分析之ChannelPipeline

每個channel內部都會持有一個ChannelPipeline物件pipeline. pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。

 

 

當channel完成register、active、read等操作時,會觸發pipeline的相應方法。

1、當channel註冊到selector時,觸發pipeline的fireChannelRegistered方法。

2、當channel的socket繫結完成時,觸發pipeline的fireChannelActive方法。

3、當有客戶端請求時,觸發pipeline的fireChannelRead方法。

4、當本次客戶端請求,pipeline執行完fireChannelRead,觸發pipeline的fireChannelReadComplete方法。

接下去看看pipeline是如何組織並執行handler對應的方法。

 

DefaultChannelPipeline

其中DefaultChannelHandlerContext儲存了當前handler的上下文,如channel、pipeline等資訊,預設實現了head和tail。

 
 

 

  1. class DefaultChannelPipeline implements ChannelPipeline {

  2. final Channel channel; // pipeline所屬的channel

  3. //head和tail都是handler上下文

  4. final DefaultChannelHandlerContext head;

  5. final DefaultChannelHandlerContext tail;

  6. ...

  7. public DefaultChannelPipeline(AbstractChannel channel) {

  8. if (channel == null) {

  9. throw new NullPointerException("channel");

  10. }

  11. this.channel = channel;

  12.  

  13. tail = new TailContext(this);

  14. head = new HeadContext(this);

  15.  

  16. head.next = tail;

  17. tail.prev = head;

  18. }

  19. }

 

1、TailContext實現了ChannelOutboundHandler介面。

2、HeadContext實現了ChannelInboundHandler介面。

3、head和tail形成了一個連結串列。

對於Inbound的操作,當channel註冊到selector時,觸發pipeline的fireChannelRegistered,從head開始遍歷,找到實現了ChannelInboundHandler介面的handler,並執行其fireChannelRegistered方法。

 
 

 

  1. @Override

  2. public ChannelPipeline fireChannelRegistered() {

  3. head.fireChannelRegistered();

  4. return this;

  5. }

  6.  

  7. @Override

  8. public ChannelHandlerContext fireChannelRegistered() {

  9. final DefaultChannelHandlerContext next = findContextInbound();

  10. EventExecutor executor = next.executor();

  11. if (executor.inEventLoop()) {

  12. next.invokeChannelRegistered();

  13. } else {

  14. executor.execute(new Runnable() {

  15. @Override

  16. public void run() {

  17. next.invokeChannelRegistered();

  18. }

  19. });

  20. }

  21. return this;

  22. }

  23.  

  24. private DefaultChannelHandlerContext findContextInbound() {

  25. DefaultChannelHandlerContext ctx = this;

  26. do {

  27. ctx = ctx.next;

  28. } while (!(ctx.handler() instanceof ChannelInboundHandler));

  29. return ctx;

  30. }

  31.  

  32. private void invokeChannelRegistered() {

  33. try {

  34. ((ChannelInboundHandler) handler()).channelRegistered(this);

  35. } catch (Throwable t) {

  36. notifyHandlerException(t);

  37. }

  38. }

 

假如我們通過pipeline的addLast方法新增一個inboundHandler實現。

 
 

 

  1. public class ClientHandler extends ChannelInboundHandlerAdapter {

  2. @Override

  3. public void channelRegistered(ChannelHandlerContext ctx)

  4. throws Exception {

  5. super.channelRegistered(ctx);

  6. System.out.println(" ClientHandler registered channel ");

  7. }

  8. }

 

當channel註冊完成時會觸發pipeline的channelRegistered方法,從head開始遍歷,找到ClientHandler,並執行channelRegistered方法。

對於Outbound的操作,則從tail向前遍歷,找到實現ChannelOutboundHandler介面的handler,具體實現和Inbound一樣。

服務啟動過程中,ServerBootstrap在init方法中,會給ServerSocketChannel的pipeline新增ChannelInitializer物件,其中ChannelInitializer繼承ChannelInboundHandlerAdapter,並實現了ChannelInboundHandler介面,所以當ServerSocketChannel註冊到selector之後,會觸發其channelRegistered方法。

 
 

 

  1. public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {

  2. initChannel((C) ctx.channel());

  3. ctx.pipeline().remove(this);

  4. ctx.fireChannelRegistered();

  5. }

  6.  

  7. public void initChannel(Channel ch) throws Exception {

  8. ChannelPipeline pipeline = ch.pipeline();

  9. ChannelHandler handler = handler();

  10. if (handler != null) {

  11. pipeline.addLast(handler);

  12. }

  13. pipeline.addLast(new ServerBootstrapAcceptor(

  14. currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

  15. }

 

在initChannel實現中,新增ServerBootstrapAcceptor例項到pipeline中。

ServerBootstrapAcceptor繼承自ChannelInboundHandlerAdapter,負責把接收到的客戶端socketChannel註冊到childGroup中,由childGroup中的eventLoop負責資料處理。

 
 

 

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) {

  2. final Channel child = (Channel) msg;

  3.  

  4. child.pipeline().addLast(childHandler);

  5.  

  6. for (Entry<ChannelOption<?>, Object> e: childOptions) {

  7. try {

  8. if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

  9. logger.warn("Unknown channel option: " + e);

  10. }

  11. } catch (Throwable t) {

  12. logger.warn("Failed to set a channel option: " + child, t);

  13. }

  14. }

  15.  

  16. for (Entry<AttributeKey<?>, Object> e: childAttrs) {

  17. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

  18. }

  19.  

  20. try {

  21. childGroup.register(child).addListener(new ChannelFutureListener() {

  22. @Override

  23. public void operationComplete(ChannelFuture future) throws Exception {

  24. if (!future.isSuccess()) {

  25. forceClose(child, future.cause());

  26. }

  27. }

  28. });

  29. } catch (Throwable t) {

  30. forceClose(child, t);

  31. }

  32. }