Netty原始碼--accept連線
上一節在Netty server啟動的過程中,我們已經清楚了server啟動時會為每個Channel分配一個NioEventLoop,NioEventLoop中有一個run方法主要用來監聽事件和執行佇列中的任務。本篇文章我們主要關心這個run方法是怎麼監聽事件,監聽完事件後又是如何處理的。
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { } catch (Throwable t) { } } }
一些細節我們省去,主要分析processSelectedKeys這個方法是怎麼監聽事件的,看下這個方法的底層呼叫。
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } } }
processSelectedKeysOptimized這個方法裡有一個for迴圈來遍歷註冊到Selector上的SelectionKey,通過SelectionKey拿到註冊到Selector上的Channel,本文分析的是accept連線過程,因此這個SelectionKey上的channel型別對應的其實就是NioServerSocketChannel。拿到這個channel之後繼續呼叫processSelectedKey這個方法。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); .... try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the IO/">NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
通過讀取SelectionKey的readyOps方法獲取selector上監聽的事件型別,我們看到最後一個if條件判斷其實就是當監聽到的事件型別是OP_READ或者OP_ACCEPT時就呼叫unsafe.read()這個方法,所以當收到一個連線請求時,就會呼叫這個方法來處理,首先看下unsafe是在哪初始化的,看下面這句程式碼。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); .... }
ch的型別的NioServerSocketChannel,通過追溯程式碼,發現是在server啟動初始化建立NioServerSocketChannel物件時初始化的,這個unsafe的具體實現類是NioMessageUnsafe類,我們看下這個類中read方法的具體實現。
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } } } }
看下doReadMessages這個方法。
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
第一句就是呼叫Channel的accept方法,因為呼叫這個方法時已經收到客戶端的連線請求了,所以呼叫accept方法後會返回一個SocketChannel物件,而不是一直阻塞等待客戶端連線,這也就是NIO執行緒模型的好處,最後將這個SocketChannel物件放在buf這個list集合中。繼續回到上面NioMessageUnsafe的read方法實現。
@Override public void read() { try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } } } }
呼叫doReadMessages將SocketChannel物件放到readBuf這個list集合後,遍歷這個list集合,呼叫pipeline.fireChannelRead(readBuf.get(i))這個方法,看下這個方法接下來會發生什麼。
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
首先呼叫DefaultChannelPipeline的fireChannelRead方法,然後在這個方法裡呼叫AbstractChannelHandlerContext.invokeChannelRead(head, msg),這裡的head是一個AbstractChannelHandlerContext物件,msg是呼叫accept方法返回的一個SocketChannel物件,此後進入AbstractChannelHandlerContext物件中的invokeChannelRead方法。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這個方法裡繼續呼叫下面這個方法。
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandler handler() { return handler; }
其中,try語句中的handler()這個方法返回的是一個ChannelHandler物件,這個handler是在之前server啟動初始化時的init方法中完成初始化的。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
這個handler對應的就是ServerBootstrapAcceptor這個handler,繼續回到上面那個方法,拿到這個handler後呼叫ServerBootstrapAcceptor這個類的channelRead方法。
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
child這個Channel就是呼叫accept方法返回的一個NioSocketChannel物件,然後將childHandler放到這個Channel物件的pipeline中,並對這個Channel物件完成引數初始化。
這個方法中比較重要的是childGroup.register(child)這個方法,這個方法和之前在將server啟動初始化那一節作用一樣,這裡是將NioSocketChannel這個Channel註冊到childGroup中的一個NioEventLoop中,然後在NioEventLoop中監聽這個channel的讀事件和寫事件。
到這裡,accept連線過程就結束了,其實總結起來很簡單。
(1)在NioServerSocketChannel對應的NioEventLoop中監聽連線事件,監聽到連線事件後,返回一個NioSocketChannel物件。
(2)將這個Channel物件註冊到childGroup中,由childGroup中的NioEventLoop完成這個channel上的IO事件監聽。
講完了accept連線過程,下一節就分析下客戶端傳送訊息到服務端時整個的訊息處理過程。