1. 程式人生 > >深入淺出Netty:ChannelPipeline

深入淺出Netty:ChannelPipeline

每個channel內部都會持有一個ChannelPipeline物件pipeline。pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。

2184951-beacd91367f1f4eb

當channel完成register、active、read等操作時,會觸發pipeline的相應方法。

1、當channel註冊到selector時,觸發pipeline的fireChannelRegistered方法。
2、當channel的socket繫結完成時,觸發pipeline的fireChannelActive方法。
3、當有客戶端請求時,觸發pipeline的fireChannelRead方法。
4、當本次客戶端請求,pipeline執行完fireChannelRead,觸發pipeline的fireChannelReadComplete方法。

接下去看看pipeline是如何組織並執行handler對應的方法。

DefaultChannelPipeline

其中DefaultChannelHandlerContext儲存了當前handler的上下文,如channel、pipeline等資訊,預設實現了head和tail。

12345678910111213141516171819 classDefaultChannelPipelineimplementsChannelPipeline{finalChannel channel;// pipeline所屬的channel//head和tail都是handler上下文finalDefaultChannelHandlerContext head;finalDefaultChannelHandlerContext tail;...publicDefaultChannelPipeline(AbstractChannel
channel){if(channel==null){thrownewNullPointerException("channel");}this.channel=channel;tail=newTailContext(this);head=newHeadContext(this);head.next=tail;tail.prev=head;}}

1、TailContext實現了ChannelOutboundHandler介面。
2、HeadContext實現了ChannelInboundHandler介面。
3、head和tail形成了一個連結串列。

對於Inbound的操作,當channel註冊到selector時,觸發pipeline的fireChannelRegistered,從head開始遍歷,找到實現了ChannelInboundHandler介面的handler,並執行其fireChannelRegistered方法。

1234567891011121314151617181920212223242526272829303132333435363738 @OverridepublicChannelPipeline fireChannelRegistered(){head.fireChannelRegistered();returnthis;}@OverridepublicChannelHandlerContext fireChannelRegistered(){finalDefaultChannelHandlerContext next=findContextInbound();EventExecutor executor=next.executor();if(executor.inEventLoop()){next.invokeChannelRegistered();}else{executor.execute(newRunnable(){@Overridepublicvoidrun(){next.invokeChannelRegistered();}});}returnthis;}privateDefaultChannelHandlerContext findContextInbound(){DefaultChannelHandlerContext ctx=this;do{ctx=ctx.next;}while(!(ctx.handler()instanceofChannelInboundHandler));returnctx;}privatevoidinvokeChannelRegistered(){try{((ChannelInboundHandler)handler()).channelRegistered(this);}catch(Throwablet){notifyHandlerException(t);}}

假如我們通過pipeline的addLast方法新增一個inboundHandler實現。

12345678 publicclassClientHandlerextendsChannelInboundHandlerAdapter{@Override  publicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{super.channelRegistered(ctx);System.out.println(" ClientHandler  registered channel ");}}

當channel註冊完成時會觸發pipeline的channelRegistered方法,從head開始遍歷,找到ClientHandler,並執行channelRegistered方法。

對於Outbound的操作,則從tail向前遍歷,找到實現ChannelOutboundHandler介面的handler,具體實現和Inbound一樣。

服務啟動過程中,ServerBootstrap在init方法中,會給ServerSocketChannel的pipeline新增ChannelInitializer物件,其中ChannelInitializer繼承ChannelInboundHandlerAdapter,並實現了ChannelInboundHandler介面,所以當ServerSocketChannel註冊到selector之後,會觸發其channelRegistered方法。

123456789101112131415 publicfinalvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{initChannel((C)ctx.channel());ctx.pipeline().remove(this);ctx.fireChannelRegistered();}publicvoidinitChannel(Channel ch)throwsException{ChannelPipeline pipeline=ch.pipeline();ChannelHandler handler=handler();if(handler!=null){pipeline.addLast(handler);}pipeline.addLast(newServerBootstrapAcceptor(currentChildGroup,currentChildHandler,currentChildOptions,currentChildAttrs));}

在initChannel實現中,新增ServerBootstrapAcceptor例項到pipeline中。

ServerBootstrapAcceptor繼承自ChannelInboundHandlerAdapter,負責把接收到的客戶端socketChannel註冊到childGroup中,由childGroup中的eventLoop負責資料處理。

1234567891011121314151617181920212223242526272829303132 publicvoidchannelRead(ChannelHandlerContext ctx,Objectmsg){finalChannel child=(Channel)msg;child.pipeline().addLast(childHandler);for(Entry,Object>e:childOptions){try{if(!child.config().setOption((ChannelOption)e.getKey(),e.getValue())){logger.warn("Unknown channel option: "+e);}}catch(Throwablet){logger.warn("Failed to set a channel option: "+child,t);}}for(Entry,Object>e:childAttrs){child.attr((AttributeKey)e.getKey()).set(e.getValue());}try{childGroup.register(child).addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuture future)throwsException{if(!future.isSuccess()){forceClose(child,future.cause());}}});}catch(Throwablet){forceClose(child,t);}}

END。

相關推薦

深入淺出NettyChannelPipeline

每個channel內部都會持有一個ChannelPipeline物件pipeline。pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。 當channel完成register、active、read等操作時,

深入淺出Nettywrite

上一章節中,分析了Netty如何處理read事件,本節分析Netty如何把資料寫會客戶端。 把資料返回客戶端,需要經歷三個步驟: 1、申請一塊快取buf,寫入資料。 2、將buf儲存到ChannelOutboundBuffer中。 3、將ChannelOutboundBuffer中的buff輸出到soc

深入淺出NettyNioEventLoop

本系列: 上一章節中,我們分析了Netty服務的啟動過程,本章節分析Netty的eventLoop是如工作的。 NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務。 I/O任務即se

深入淺出Netty服務啟動

本系列: 本節主要分析server的啟動過程。 Netty是基於Nio實現的,所以也離不開selector、serverSocketChannel、socketChannel和selectKey等,只不過Netty把這些實現都封裝在了底層。 從示例可以看出,一切

深入淺出Nettyread

本系列: boss執行緒主要負責監聽並處理accept事件,將socketChannel註冊到work執行緒的selector,由worker執行緒來監聽並處理read事件,本節主要分析Netty如何處理read事件。 accept->read 當wor

深入淺出Nettyaccept

本系列: 本章節分析服務端如何accept客戶端的connect請求。 在《章節中,已經分析了NioEventLoop的工作機制,當有客戶端connect請求,selector可以返回其對應的SelectionKey,方法processSelectedKeys進行

Netty學習ChannelPipeline

一個{@link ChannelHandler}的列表,它處理或攔截{@link Channel}的入站事件和出站操作。 建立管道  每個通道都有自己的管道,在建立新通道時自動建立管道。 事件如何在管道中流動 下圖描述了在{@link ChannelPipeline}

深入淺出Netty記憶體管理PoolSubpage

本系列: 上一節中分析瞭如何在poolChunk中分配一塊大於pageSize的記憶體,但在實際應用中,存在很多分配小記憶體的情況,如果也佔用一個page,明顯很浪費。針對這種情況,Netty提供了PoolSubpage把poolChunk的一個page節點8k

深入淺出Netty記憶體管理PoolChunk

多年之前,從C記憶體的手動管理上升到java的自動GC,是歷史的巨大進步。然而多年之後,netty的記憶體實現又曲線的回到了手動管理模式,正印證了馬克思哲學觀:社會總是在螺旋式前進的,沒有永遠的最好。的確,就記憶體管理而言,GC給程式設計師帶來的價值是不言而喻的,

深入淺出Netty記憶體管理PoolArena

前面分別分析了PoolChunk、PoolSubpage和PoolChunkList,本文主要分析PoolArena。 PoolArena 應用層的記憶體分配主要通過如下實現,但最終還是委託給PoolArena實現。 PooledByteBufAllocat

深入淺出Netty記憶體管理PoolChunkList

前面兩篇分別分析了PoolChunk和PoolSubpage的實現,本文主要分析管理PoolChunk生命週期的PoolChunkList。 PoolChunkList PoolChunkList負責管理多個chunk的生命週期,在此基礎上對記憶體分配進行進一步的優化。

深入淺出講解php的socket通信

刪除 不一定 電話鈴 例子 通過 另一個 一次 函數返回 ima 對TCP/IP、UDP、Socket編程這些詞你不會很陌生吧?隨著網絡技術的發展,這些詞充斥著我們的耳朵。那麽我想問:1. 什麽是TCP/IP、UDP?2. Socke

深入淺出CSSDiv(一)

指定 增加 src 深入 lock alt 舉例 gin width 這個系列是學習筆記,簡明記錄結論性的知識。 新建一個層時,border為零,margin為0,padding為0,如果不指定寬度(width),則自動100%填充父元素。 三、層與父元素的關系 1.

下載-深入淺出Netty源碼剖析、Netty實戰高性能分布式RPC、NIO+Netty5各種RPC架構實戰演練三部曲視頻教程

分布式 sta nio png 分享 net alt 性能 架構 下載-深入淺出Netty源碼剖析、Netty實戰高性能分布式RPC、NIO+Netty5各種RPC架構實戰演練三部曲視頻教程 第一部分:入淺出Netty源碼剖析 第二部分:Netty實戰高性能分布式R

深入淺出MyBatisJDBC和MyBatis介紹

mybatis最近在休陪產假,時間比較零碎,準備看2本書充實下,一本是「深入淺出MyBatis:技術原理與實踐」,一本是「RabbitMQ實戰:高效部署分布式消息隊列」,為了加深記憶和理解,會進行整理、擴展和記錄。 看書的目標不是把所有的細節都記住,而是從整體上了解一個技術能做什麽,包含的特性、基本模塊,實現

深入淺出MyBatis反射和動態代理

mybatis前三篇詳細總結了Mybatis的基本特性、常用配置、映射器,相對於Hibernate,映射器的配置相對復雜,但有很好的靈活性和擴展性,可以應對各種業務場景。熟練掌握這些內容,可以流暢的使用MyBatis進行開發了。 後面準備介紹MyBatis的解析和運行原理以及自定義插件,今天看了書籍的這部分,

深入淺出MyBatis「映射器」全了解

mybatis本篇文章是「深入淺出MyBatis:技術原理與實踐」書籍的總結筆記。 上一篇總結了MyBatis的配置,詳細說明了各個配置項,其中提到了映射器,它是MyBatis最強大的工具,也是使用最多的工具。 通過映射器,可以很容易的進行數據的增刪改查操作,我們抽象下進行這些操作的關鍵點:傳遞查詢參數、組

深入淺出MyBatisMyBatis與Spring集成及實用場景

mybatis本系列是「深入淺出MyBatis:技術原理與實踐」書籍的總結筆記。 本篇是「深入淺出MyBatis」系列的最後一篇,主要介紹與Spring的集成,以及工作中的一些實用場景。 介紹之前,先整體總結下該系列的內容和寫作思路。 MyBatis是一個框架,封裝了數據庫相關的操作,給我們開發人員帶來了極

Netty 專欄】深入淺出 Netty write

把資料返回客戶端,需要經歷三個步驟: 1、申請一塊快取buf,寫入資料。 2、將buf儲存到ChannelOutboundBuffer中。 3、將ChannelOutboundBuffer中的buff輸出到socketChannel中。   public void c

深入淺出講解php的socket通訊

今天覆習了,socket程式設計,看到了一篇不錯的文章,轉發過來做個筆記。 對TCP/IP、UDP、Socket程式設計這些詞你不會很陌生吧?隨著網路技術的發展,這些詞充斥著我們的耳朵。那麼我想問: 1.         什麼是TCP/IP、UDP? 2.