1. 程式人生 > >深入淺出Netty:read

深入淺出Netty:read

本系列:

boss執行緒主要負責監聽並處理accept事件,將socketChannel註冊到work執行緒的selector,由worker執行緒來監聽並處理read事件,本節主要分析Netty如何處理read事件。

accept->read

當work執行緒的selector檢測到OP_READ事件發生時,觸發read操作。

12345678 //NioEventLoop  if((readyOps&(SelectionKey.OP_READ|SelectionKey.OP_ACCEPT))!=0||readyOps==0){unsafe.read();if(!ch.isOpen()){// Connection already closed - no need to handle write.  return;}}

該read方法定義在類NioByteUnsafe中。

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 //AbstractNioByteChannel.NioByteUnsafepublicfinalvoidread(){finalChannelConfig config=config();if(!config.isAutoRead
()&&!isReadPending()){// ChannelConfig.setAutoRead(false) was called in the meantimeremoveReadOp();return;}finalChannelPipeline pipeline=pipeline();finalByteBufAllocator allocator=config.getAllocator();finalintmaxMessagesPerRead=config.getMaxMessagesPerRead();RecvByteBufAllocator.Handle allocHandle=this.allocHandle;if(allocHandle==null){this.allocHandle=allocHandle=config.getRecvByteBufAllocator().newHandle();}ByteBuf byteBuf=null;intmessages=0;booleanclose=false;try{inttotalReadAmount=0;booleanreadPendingReset=false;do{byteBuf=allocHandle.allocate(allocator);intwritable=byteBuf.writableBytes();intlocalReadAmount=doReadBytes(byteBuf);if(localReadAmount<=0){// not was read release the bufferbyteBuf.release();byteBuf=null;close=localReadAmount<0;break;}if(!readPendingReset){readPendingReset=true;setReadPending(false);}pipeline.fireChannelRead(byteBuf);byteBuf=null;if(totalReadAmount>=Integer.MAX_VALUE-localReadAmount){// Avoid overflow.totalReadAmount=Integer.MAX_VALUE;break;}totalReadAmount+=localReadAmount;// stop readingif(!config.isAutoRead()){break;}if(localReadAmount<writable){// Read less than what the buffer can hold,// which might mean we drained the recv buffer completely.break;}}while(++messages<maxMessagesPerRead);pipeline.fireChannelReadComplete();allocHandle.record(totalReadAmount);if(close){closeOnRead(pipeline);close=false;}}catch(Throwablet){handleReadException(pipeline,byteBuf,t,close);}finally{// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if(!config.isAutoRead()&&!isReadPending()){removeReadOp();}}}

1、allocHandle負責自適應調整當前快取分配的大小,以防止快取分配過多或過少,先看看AdaptiveRecvByteBufAllocator內部實現:

12345678 publicclassAdaptiveRecvByteBufAllocatorimplementsRecvByteBufAllocator{staticfinalintDEFAULT_MINIMUM=64;staticfinalintDEFAULT_INITIAL=1024;staticfinalintDEFAULT_MAXIMUM=65536;privatestaticfinalintINDEX_INCREMENT=4;privatestaticfinalintINDEX_DECREMENT=1;privatestaticfinalint[]SIZE_TABLE;}

SIZE_TABLE:按照從小到大的順序預先儲存可以分配的快取大小。
從16開始,每次累加16,直到496,接著從512開始,每次增大一倍,直到溢位。
DEFAULT_MINIMUM:最小快取(64),在SIZE_TABLE中對應的下標為3。
DEFAULT_MAXIMUM :最大快取(65536),在SIZE_TABLE中對應的下標為38。
DEFAULT_INITIAL :初始化快取大小,第一次分配快取時,由於沒有上一次實際收到的位元組數做參考,需要給一個預設初始值。
INDEX_INCREMENT:上次預估快取偏小,下次index的遞增值。
INDEX_DECREMENT :上次預估快取偏大,下次index的遞減值。

2、allocHandle.allocate(allocator) 申請一塊指定大小的記憶體。

1234 //AdaptiveRecvByteBufAllocator.HandleImplpublicByteBuf allocate(ByteBufAllocator alloc){returnalloc.ioBuffer(nextReceiveBufferSize);}

通過ByteBufAllocator的ioBuffer方法申請快取。

1234567 //AbstractByteBufAllocatorpublicByteBuf ioBuffer(intinitialCapacity){if(PlatformDependent.hasUnsafe()){returndirectBuffer(initialCapacity);}returnheapBuffer(initialCapacity);}

根據平臺是否支援unsafe,選擇使用直接實體記憶體還是堆上記憶體。

direct buffer方案:

1234567891011121314151617181920212223 //AbstractByteBufAllocatorpublicByteBuf directBuffer(intinitialCapacity){returndirectBuffer(initialCapacity,Integer.MAX_VALUE);}publicByteBuf directBuffer(intinitialCapacity,intmaxCapacity){if(initialCapacity==0&&maxCapacity==0){returnemptyBuf;}validate(initialCapacity,maxCapacity);returnnewDirectBuffer(initialCapacity,maxCapacity);}//UnpooledByteBufAllocatorprotectedByteBuf newDirectBuffer(intinitialCapacity,intmaxCapacity){ByteBuf buf;if(PlatformDependent.hasUnsafe()){buf=newUnpooledUnsafeDirectByteBuf(this,initialCapacity,maxCapacity);}else{buf=newUnpooledDirectByteBuf(this,initialCapacity,maxCapacity);}returntoLeakAwareBuffer(buf);}

UnpooledUnsafeDirectByteBuf是如何實現快取管理的?對Nio的ByteBuffer進行了封裝,通過ByteBuffer的allocateDirect方法實現快取的申請。

123456789101112131415161718192021222324 protectedUnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc,ByteBuffer initialBuffer,intmaxCapacity){//判斷邏輯已經忽略this.alloc=alloc;setByteBuffer(allocateDirect(initialCapacity));}protectedByteBuffer allocateDirect(intinitialCapacity){returnByteBuffer.allocateDirect(initialCapacity);}privatevoidsetByteBuffer(ByteBuffer buffer){ByteBuffer oldBuffer=this.buffer;if(oldBuffer!=null){if(doNotFree){doNotFree=false;}else{freeDirect(oldBuffer);}}this.buffer=buffer;memoryAddress=PlatformDependent.directBufferAddress(buffer);

相關推薦

深入淺出Nettyread

本系列: boss執行緒主要負責監聽並處理accept事件,將socketChannel註冊到work執行緒的selector,由worker執行緒來監聽並處理read事件,本節主要分析Netty如何處理read事件。 accept->read 當wor

深入淺出Nettywrite

上一章節中,分析了Netty如何處理read事件,本節分析Netty如何把資料寫會客戶端。 把資料返回客戶端,需要經歷三個步驟: 1、申請一塊快取buf,寫入資料。 2、將buf儲存到ChannelOutboundBuffer中。 3、將ChannelOutboundBuffer中的buff輸出到soc

深入淺出NettyNioEventLoop

本系列: 上一章節中,我們分析了Netty服務的啟動過程,本章節分析Netty的eventLoop是如工作的。 NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務。 I/O任務即se

深入淺出Netty服務啟動

本系列: 本節主要分析server的啟動過程。 Netty是基於Nio實現的,所以也離不開selector、serverSocketChannel、socketChannel和selectKey等,只不過Netty把這些實現都封裝在了底層。 從示例可以看出,一切

深入淺出NettyChannelPipeline

每個channel內部都會持有一個ChannelPipeline物件pipeline。pipeline預設實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext連結串列。 當channel完成register、active、read等操作時,

深入淺出Nettyaccept

本系列: 本章節分析服務端如何accept客戶端的connect請求。 在《章節中,已經分析了NioEventLoop的工作機制,當有客戶端connect請求,selector可以返回其對應的SelectionKey,方法processSelectedKeys進行

深入淺出Netty記憶體管理PoolSubpage

本系列: 上一節中分析瞭如何在poolChunk中分配一塊大於pageSize的記憶體,但在實際應用中,存在很多分配小記憶體的情況,如果也佔用一個page,明顯很浪費。針對這種情況,Netty提供了PoolSubpage把poolChunk的一個page節點8k

深入淺出Netty記憶體管理PoolChunk

多年之前,從C記憶體的手動管理上升到java的自動GC,是歷史的巨大進步。然而多年之後,netty的記憶體實現又曲線的回到了手動管理模式,正印證了馬克思哲學觀:社會總是在螺旋式前進的,沒有永遠的最好。的確,就記憶體管理而言,GC給程式設計師帶來的價值是不言而喻的,

深入淺出Netty記憶體管理PoolArena

前面分別分析了PoolChunk、PoolSubpage和PoolChunkList,本文主要分析PoolArena。 PoolArena 應用層的記憶體分配主要通過如下實現,但最終還是委託給PoolArena實現。 PooledByteBufAllocat

深入淺出Netty記憶體管理PoolChunkList

前面兩篇分別分析了PoolChunk和PoolSubpage的實現,本文主要分析管理PoolChunk生命週期的PoolChunkList。 PoolChunkList PoolChunkList負責管理多個chunk的生命週期,在此基礎上對記憶體分配進行進一步的優化。

CentOS7使用ssh不能登錄,報錯Read from socket failed: Connection reset by peer

read from socket failed: connection reset by peer使用xshell登錄CentOS7,不能登錄,使用另外一臺Linux主機,telent 22端口是同的,ssh連接報以下錯誤:Read from socket failed: Connection reset b

深入淺出講解php的socket通信

刪除 不一定 電話鈴 例子 通過 另一個 一次 函數返回 ima 對TCP/IP、UDP、Socket編程這些詞你不會很陌生吧?隨著網絡技術的發展,這些詞充斥著我們的耳朵。那麽我想問:1. 什麽是TCP/IP、UDP?2. Socke

深入淺出CSSDiv(一)

指定 增加 src 深入 lock alt 舉例 gin width 這個系列是學習筆記,簡明記錄結論性的知識。 新建一個層時,border為零,margin為0,padding為0,如果不指定寬度(width),則自動100%填充父元素。 三、層與父元素的關系 1.

下載-深入淺出Netty源碼剖析、Netty實戰高性能分布式RPC、NIO+Netty5各種RPC架構實戰演練三部曲視頻教程

分布式 sta nio png 分享 net alt 性能 架構 下載-深入淺出Netty源碼剖析、Netty實戰高性能分布式RPC、NIO+Netty5各種RPC架構實戰演練三部曲視頻教程 第一部分:入淺出Netty源碼剖析 第二部分:Netty實戰高性能分布式R

pip安裝selenium報錯Read timed out

cnblogs except clas https lib def connect pack spa 今天打算把selenium降級重新安裝,發現安裝時總是失敗,報如下錯誤: raise ReadTimeoutError(self._pool, None, ‘Read t

深入淺出MyBatisJDBC和MyBatis介紹

mybatis最近在休陪產假,時間比較零碎,準備看2本書充實下,一本是「深入淺出MyBatis:技術原理與實踐」,一本是「RabbitMQ實戰:高效部署分布式消息隊列」,為了加深記憶和理解,會進行整理、擴展和記錄。 看書的目標不是把所有的細節都記住,而是從整體上了解一個技術能做什麽,包含的特性、基本模塊,實現

深入淺出MyBatis反射和動態代理

mybatis前三篇詳細總結了Mybatis的基本特性、常用配置、映射器,相對於Hibernate,映射器的配置相對復雜,但有很好的靈活性和擴展性,可以應對各種業務場景。熟練掌握這些內容,可以流暢的使用MyBatis進行開發了。 後面準備介紹MyBatis的解析和運行原理以及自定義插件,今天看了書籍的這部分,

深入淺出MyBatis「映射器」全了解

mybatis本篇文章是「深入淺出MyBatis:技術原理與實踐」書籍的總結筆記。 上一篇總結了MyBatis的配置,詳細說明了各個配置項,其中提到了映射器,它是MyBatis最強大的工具,也是使用最多的工具。 通過映射器,可以很容易的進行數據的增刪改查操作,我們抽象下進行這些操作的關鍵點:傳遞查詢參數、組

深入淺出MyBatisMyBatis與Spring集成及實用場景

mybatis本系列是「深入淺出MyBatis:技術原理與實踐」書籍的總結筆記。 本篇是「深入淺出MyBatis」系列的最後一篇,主要介紹與Spring的集成,以及工作中的一些實用場景。 介紹之前,先整體總結下該系列的內容和寫作思路。 MyBatis是一個框架,封裝了數據庫相關的操作,給我們開發人員帶來了極

第二章 從鍵盤或文件中獲取標準輸入read命令

read命令 從鍵盤或文件中獲取標準輸入 第二章 從鍵盤或文件中獲取標準輸入:read命令 read命令 從鍵盤讀取變量的值,通常用在shell腳本中與用戶進行交互的場合。該命令可以一次讀多個變量的值,變量和輸入的值都需要使用空格隔開。在read命令後面,如果沒有指定變量名,讀取的數據將被自動賦值給