1. 程式人生 > >深入淺出Netty:accept

深入淺出Netty:accept

本系列:

本章節分析服務端如何accept客戶端的connect請求。

在《章節中,已經分析了NioEventLoop的工作機制,當有客戶端connect請求,selector可以返回其對應的SelectionKey,方法processSelectedKeys進行後續的處理。

 2184951-d9cb43b57e0dfd11
1234567 privatevoidprocessSelectedKeys(){if(selectedKeys!=null){processSelectedKeysOptimized(selectedKeys.flip());}else{processSelectedKeysPlain(selector.selectedKeys());}}

預設採用優化過的SelectedSelectionKeySet儲存有事件發生的selectedKey。

1、SelectedSelectionKeySet內部使用兩個大小為1024的SelectionKey陣列keysA和keysB儲存selectedKey。

2、把SelectedSelectionKeySet例項對映到selector的原生selectedKeys和publicSelectedKeys。

123456789101112131415161718192021222324252627282930313233343536373839404142 privatevoidprocessSelectedKeysOptimized(SelectionKey[]selectedKeys){for(inti=0;;i++){finalSelectionKeyk=selectedKeys[i];if(k==null){break;}// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys[i]=null;finalObjecta=k.attachment();if(ainstanceofAbstractNioChannel){processSelectedKey(k,(AbstractNioChannel)a);}else{@SuppressWarnings("unchecked")NioTask<SelectableChannel>task=(NioTask<SelectableChannel>)a;processSelectedKey(k,task);}if(needsToSelectAgain){// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363for(;;){i++;if(selectedKeys[i]==null){break;}selectedKeys[i]=null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again.//// See https://github.com/netty/netty/issues/1523selectedKeys=this.selectedKeys.flip();i=-1;}}}

因為selector的I/O多路複用機制,一次可以返回多個selectedKey,所以要用for迴圈處理全部selectionKey。

假設這時有請求進來,selectedKeys中就存在一個selectionKey,這塊邏輯不清楚的可以回頭看看深入淺出Nio Socket

1、通過k.attachment()可以獲取ServerSocketChannel註冊時繫結上去的附件,其實這個附件就是ServerSocketChannel自身。
2、如果selectedKey的附件是AbstractNioChannel型別的,執行processSelectedKey(k, (AbstractNioChannel) a)方法進行下一步操作。

123456789101112131415161718192021222324252627282930313233343536 privatestaticvoidprocessSelectedKey(SelectionKeyk,AbstractNioChannel ch){finalNioUnsafe unsafe=ch.unsafe();if(!k.isValid()){// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try{intreadyOps=k.readyOps();// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif((readyOps&(SelectionKey.OP_READ|SelectionKey.OP_ACCEPT))!=0||readyOps==0){unsafe.read();if(!ch.isOpen()){// Connection already closed - no need to handle write.return;}}if((readyOps&SelectionKey.OP_WRITE)!=0){// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}if((readyOps&SelectionKey.OP_CONNECT)!=0){// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924intops=k.interestOps();ops&=~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}}catch(CancelledKeyException ignored){unsafe.close(unsafe.voidPromise());}}

1、獲取ServerSocketChannel的unsafe物件。
2、當前selectionKey發生的事件是SelectionKey.OP_ACCEPT,執行unsafe的read方法。

該read方法定義在NioMessageUnsafe類中:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 privatefinalList<Object>readBuf=newArrayList<Object>();@Overridepublicvoidread(){asserteventLoop().inEventLoop();finalChannelConfig config=config();if(!config.isAutoRead()&&!isReadPending()){// ChannelConfig.setAutoRead(false) was called in the meantimeremoveReadOp();return;}finalintmaxMessagesPerRead=config.getMaxMessagesPerRead();finalChannelPipeline pipeline=pipeline();booleanclosed=false;Throwable exception=null;try{try{for(;;){intlocalRead=doReadMessages(readBuf);if(localRead==0){break;}if(localRead<0){closed=true;break;}// stop reading and remove opif(!config.isAutoRead()){break;}if(readBuf.size()>=maxMessagesPerRead){break;}}}catch(Throwablet){exception=t;}setReadPending(false);intsize=readBuf.size();for(inti=0;i<size;i++){pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();pipeline.fireChannelReadComplete();if(exception!=null){if(exception instanceofIOException&&!(exception instanceofPortUnreachableException)){// ServerChannel should not be closed even on IOException because it can often continue// accepting incoming connections. (e.g. too many open files)closed=!(AbstractNioMessageChannel.this instanceofServerChannel);}pipeline.fireExceptionCaught(exception);}if(closed){if(isOpen()){close(voidPromise());}}}finally{// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if(!config.isAutoRead()&&!isReadPending()){removeReadOp();}}}

1、readBuf 用來儲存客戶端NioSocketChannel,預設一次不超過16個。
2、方法doReadMessages進行處理ServerSocketChannel的accept操作。

123456