1. 程式人生 > >深入淺出Netty:write

深入淺出Netty:write

上一章節中,分析了Netty如何處理read事件,本節分析Netty如何把資料寫會客戶端。

把資料返回客戶端,需要經歷三個步驟:

  • 1、申請一塊快取buf,寫入資料。
  • 2、將buf儲存到ChannelOutboundBuffer中。
  • 3、將ChannelOutboundBuffer中的buff輸出到socketChannel中。
1234567891011121314151617 publicvoidchannelRead(ChannelHandlerContext ctx,Objectmsg)throwsException{ReferenceCountUtil.release(msg);ByteBuf buf1=ctx.alloc().buffer(4);buf1.writeInt(1);ByteBuf buf2=ctx.alloc().buffer(4);buf2.writeInt(2);ByteBuf buf3=ctx.alloc
().buffer(4);buf3.writeInt(3);ctx.write(buf1);ctx.write(buf2);ctx.write(buf3);ctx.flush();}

為什麼需要把buf儲存到ChannelOutboundBuffer?

ctx.write()實現:

1234567891011121314151617181920212223 //AbstractChannelHandlerContext.javapublicChannelFuture write(Objectmsg){returnwrite(msg,newPromise());}privatevoidwrite(Objectmsg,booleanflush,ChannelPromise promise){AbstractChannelHandlerContext next=findContextOutbound();EventExecutor executor=next.executor();if(executor.inEventLoop()){next.invokeWrite(msg,promise);if(flush){next.invokeFlush();}}else{AbstractWriteTask task;if(flush){task=WriteAndFlushTask.newInstance(next,msg,promise);}else{task=WriteTask.newInstance(next,msg,promise);}safeExecute(executor,task,promise,msg);}}

預設情況下,findContextOutbound()會找到pipeline的head節點,觸發write方法。

1234567891011121314151617181920212223242526272829 //HeadContext.javapublicvoidwrite(ChannelHandlerContext ctx,Objectmsg,ChannelPromise promise)throwsException{unsafe.write(msg,promise);}//AbstractUnsafepublicfinalvoidwrite(Objectmsg,ChannelPromise promise){ChannelOutboundBuffer outboundBuffer=this.outboundBuffer;if(outboundBuffer==null){safeSetFailure(promise,CLOSED_CHANNEL_EXCEPTION);ReferenceCountUtil.release(msg);return;}intsize;try{msg=filterOutboundMessage(msg);size=estimatorHandle().size(msg);if(size<0){size=0;}}catch(Throwablet){safeSetFailure(promise,t);ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg,size,promise);}

outboundBuffer 隨著Unsafe一起例項化,最終將msg通過outboundBuffer封裝起來。

ChannelOutboundBuffer內部維護了一個Entry連結串列,並使用Entry封裝msg。

1、unflushedEntry:指向連結串列頭部
2、tailEntry:指向連結串列尾部
3、totalPendingSize:儲存msg的位元組數
4、unwritable:不可寫標識

123456789101112131415161718 publicvoidaddMessage(Objectmsg,intsize,ChannelPromise promise){Entry entry=Entry.newInstance(msg,size,total(msg),promise);if(tailEntry==null){flushedEntry=null;tailEntry=entry;}else{Entry tail=tailEntry;tail.next=entry;tailEntry=entry;}if(unflushedEntry==null){unflushedEntry=entry;}// increment pending bytes after adding message to the unflushed arrays.// See https://github.com/netty/netty/issues/1619incrementPendingOutboundBytes(size,false);}

通過Entry.newInstance返回Entry例項,Netty對Entry採用了快取策略,使用完的Entry例項需要清空並回收,難道是因為Entry例項化比較耗時?

新的entry預設插入連結串列尾部,並讓tailEntry指向它。

2184951-53e95abefcc0504f

123456789 privatevoidincrementPendingOutboundBytes(longsize,booleaninvokeLater){if(size==0){return;}longnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,size);if(newWriteBufferSize>=channel.config().getWriteBufferHighWaterMark()){setUnwritable(invokeLater);}}

方法incrementPendingOutboundBytes主要採用CAS更新totalPendingSize欄位,並判斷當前totalPendingSize是否超過閾值writeBufferHighWaterMark,預設是65536。如果totalPendingSize >= 65536,則採用CAS更新unwritable為1,並觸發ChannelWritabilityChanged事件。

到此為止,全部的buf資料已經儲存在outboundBuffer中。

ctx.flush()實現:

1234567891011121314151617181920 publicChannelHandlerContext flush(){finalAbstractChannelHandlerContext next=findContextOutbound();EventExecutor executor=next.executor();if(executor.inEventLoop()){next.invokeFlush();}else{Runnable task=next.invokeFlushTask;if(task==null){next.invokeFlushTask=task=newRunnable(){@Overridepublicvoidrun(){next.invokeFlush();}};}safeExecute(executor,task,channel().voidPromise(),null);}returnthis;}

預設情況下,findContextOutbound()會找到pipeline的head節點,觸發flush方法。

123456789101112131415 //HeadContext.javapublicvoidflush(ChannelHandlerContext ctx)throwsException{unsafe.flush();}//AbstractUnsafepublicfinalvoidflush(){assertEventLoop();ChannelOutboundBuffer outboundBuffer=this.outboundBuffer;if(outboundBuffer==null){return;}outboundBuffer.addFlush();flush0();}

方法addFlush主要對write過程新增的msg進行flush標識,其實我不清楚,這個標識過程有什麼意義。

直接看flush0方法:

1234567891011121314 protectedfinalvoidflush0(){// Flush immediately only when there's no pending flush.// If there's a pending flush operation, event loop will call forceFlush() later,// and thus there's no need to call it now.if(isFlushPending()){return;}super.flush0();}privatebooleanisFlushPending(){SelectionKey selectionKey=selectionKey();returnselectionKey.isValid()&&(selectionKey.interestOps()&SelectionKey.OP_WRITE)!=0;}

1、如果當前selectionKey 是寫事件,說明有執行緒執行flush過程,則直接返回。
2、否則直接執行flush操作。