netty分析(一) -- 服務啟動流程
如果還不瞭解原生nio的socket程式設計,可以看前置博文
一個簡單的Demo程式
先貼一個簡單的netty的example中echo服務端程式碼
/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * *http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.example.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * Echoes back any received data from a client. */ public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
程式碼很簡潔,但是看不懂,因為使用的這些類均和Nio原生程式設計相差甚遠,下面先簡單分析一下。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup);
此處首先是新建了一個ServerBootstrap 啟動類,分別設定好boss和worker工作執行緒。
b.channel(NioServerSocketChannel.class);
此處是設定channel的型別,內部會以建立一個ServerBootstrapChannelFactory工廠來儲存class,用於後續物件建立。
b.option(ChannelOption.SO_BACKLOG, 100);
此處設定了客戶端連線socket屬性。
b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
此處設定了客戶端連線建立以後對SocketChannel的初始化邏輯。
以上的程式碼均是給ServerBootstrap物件的各個引數賦值,真正讓netty跑起來的重點在下面程式碼。
ChannelFuture f = b.bind(port).sync();
閱讀這段程式碼之前,我們留個懸念,我們需要先了解另一個類:NioEventLoop。瞭解了NioEventLoop,netty中的執行緒模型就清晰起來了,後續分析將不會太費力。
NioEventLoop
NioEventLoop是與jdk層nio互動的最重要的物件,是在NioEventLoopGroup物件中創建出來的。
NioEventGroup內部有個名為children的陣列,我們把它理解成一個頭尾相連的環,每次我們呼叫NioEventLoopGroup.next()方法時,會返回這個環的下一個元素。這個元素就是一個NioEventLoop。
這個children的大小由什麼決定呢?答案就是NioEventLoopGroup物件構造時傳入的執行緒數量。
接下來我們來看看NioEventLoop的具體實現,建構函式
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) { super(parent, executor, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); }
看一下openSelector()的實現。
private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Class<?> selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader()); // Ensure the current selector implementation is what we can instrument. if (!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); selectedKeys = selectedKeySet; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable t) { selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t); } return selector; }
建構函式呼叫了provider.openSelector()來產生一個多路複用選擇器物件。
jdk原生Nio實現中,selector內部有一個HashSet物件selectedKeys,用來儲存呼叫select函式之後的結果集。如果未禁用優化,此處還利用反射將selector內部的selectedKeys值設定成本地物件。這麼做有一個好處,每次呼叫Selector的select函式以後,能很方便的檢視selectedKeys的值以確認是否產生了發生了新的事件。
外界可以呼叫NioEventLoop的execute方法來放入任務,檢視其實現。
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp) { wakeup(inEventLoop); } } private void startThread() { synchronized (stateLock) { if (state == ST_NOT_STARTED) { state = ST_STARTED; delayedTaskQueue.add(new ScheduledFutureTask<Void>( this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); doStartThread(); } } }
NioEventLoop內部有一個狀態變數state,這保證了在呼叫startThread方法時,只會呼叫一次doStartThread。而doStartThread,在首次呼叫的時候,會建立新的執行緒,檢視doStartThread
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { if (state < ST_SHUTTING_DOWN) { state = ST_SHUTTING_DOWN; } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { synchronized (stateLock) { state = ST_TERMINATED; } threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
主角是executor,看下其預設實現。
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
可以看到,每次呼叫executor的execute方法將會產生一個新的執行緒,實際上只調用了一次doStartThread,所以只會建立一個執行緒。
新執行緒最後呼叫到了"SingleThreadEventExecutor.this.run();"。好了,我們離真相已經很近了。貼一下run的實現。
protected void run() { for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and //'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and //'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
run方法,是一個死迴圈,做的事情就是週期執行Selector的select函式獲取事件並處理,以及執行一些拋進佇列的任務。
-
- select()/selectNow(),檢視函式內部, 執行了原生Selector的select方法 ,第一步已經浮出水面了,根據nio的呼叫流程(詳細程式碼在這篇博文中有),下一步應該就是ServerSocketChannel呼叫accept函式來接受客戶端連結了,讓我們找一下。
-
2.如果select呼叫之後有事件發生。那麼selectedKeys將發生改變(注意selectedKeys變數實際是指向底層Selector的觸發事件集合的引用),此時進入processSelectedKeysOptimized函式處理:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
進一步看processSelectedKey
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
當客戶端觸發連線的時候,readyOps應該是16 ,對應著SelectionKey.OP_ACCEPT(如果觸發了OP_READ,那麼將觸發讀取客戶端資料操作,這個在下篇博文中再詳盡分析,地址),進一步檢視unsafe.read()中呼叫的doReadMessages方法。
public void read() { assert eventLoop().inEventLoop(); if (!config().isAutoRead()) { removeReadOp(); } final ChannelConfig config = config(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final boolean autoRead = config.isAutoRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { for (;;) { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } if (readBuf.size() >= maxMessagesPerRead | !autoRead) { break; } } } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); } pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { close(voidPromise()); } } }
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
由此我們也找到了accept,藏的還挺深,呼叫accept之後我們拿到具體對接客戶端連線的socket繫結到一個work執行緒,放入list buf中。接著我們回到上層的read方法。
一步步呼叫到了這裡。

設定SocketChannel的pipline屬性堆疊
public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } child.unsafe().register(child.newPromise()); }
首先是這句 "child.pipeline().addLast(childHandler);" 很熟悉不是嗎,childHandler是開頭我們呼叫ServerBootstrap的childHandler方法傳入的處理物件,接下來設定好socket屬性
檢視register實現。
public final void register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } }
向eventLoop投遞了一個register事件,在eventLoop(NioEventLoop)執行緒(此時的eventLoop是workerGroup中的執行緒)中,將會把這個SocketChannel也註冊到eventLoop中的selector, 注意到這裡實現和我們原生的nio呼叫有區別,每個執行緒都啟用了一個Selector物件來輪詢事件 。
看看bind做了什麼
public ChannelFuture bind(SocketAddress localAddress) { validate();//判斷引數合法性 if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
看doBind
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
初始化了一個Channel,並將其繫結到boss執行緒。我們進一步看下initAndRegister
final ChannelFuture initAndRegister() { Channel channel; try { channel = createChannel(); } catch (Throwable t) { return VoidChannel.INSTANCE.newFailedFuture(t); } try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regFuture = channel.newPromise(); channel.unsafe().register(regFuture); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. //i.e. It's safe to attempt bind() or connect() now beause the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully //added to the event loop's task queue for later execution. //i.e. It's safe to attempt bind() or connect() now: //because bind() or connect() will be executed *after* the scheduled registration task is executed //because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
進一步分為三個步驟,createChannel,init和register。
Channel createChannel() { EventLoop eventLoop = group().next(); return channelFactory().newChannel(eventLoop, childGroup); }
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
根據createChannel的實現所示,ServerBootstrap.channel設定進來的Channel型別派上用場了。這裡將bossGroup中的NioeventLoop繫結到
創建出來的channel中,為什麼也同時綁了workerGroup呢,因為這個ServerChannel接收到的客戶端連線要拋給指定的worker處理呀。
init函式完成了setoption,及給ServerChannel的pipline綁定了對於的處理ChannelHandler。
接下來我們著重看下register的實現。
public final void register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister(); registered = true; promise.setSuccess(); pipeline.fireChannelRegistered(); if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
終於看到了呼叫了eventLoop.execute方法。這裡由於不是Eventloop的內部執行緒因此會走到execute的邏輯。結合我們之前對NioEventLoop的分析,首次呼叫會建立一個新的執行緒來執行投遞進去Runnable物件的run方法,最後執行了ServerChannel的註冊邏輯。注意到傳進去的promise是一個future物件,在註冊成功以後,可以由其他執行緒通過promise看到是否執行完成
至此,我們總結一下。
ServerBootstrap設定了兩個執行緒組,bossGroup和workerGroup,每個執行緒內部均有一個selector迴圈地執行select函式來查詢監聽的事件。正常場景下,我們應該只有一個監聽埠,此時bossGroup僅有一個執行緒在工作。
boss執行緒的selector只綁定了一個ServerSocketChannel,當其accept到一個客戶端連線以後,會呼叫執行緒組的next()函式獲取一個NioEventLoop來將SocketChannel放入worker中執行邏輯。
同時NioEventLoop還有一個execute方法,支援了其他執行緒往內部執行緒拋入Runnable任務。這個主要場景是boss執行緒檢測到有新連線到來時,將channel註冊到worker執行緒組。以及使用者執行緒函式在呼叫ServerBootstrap的bind
時註冊serverChannel到boss執行緒。
還需要擴充套件認識的部分
還是有許多疑惑,資料的拆分包的實現原理是怎樣的,ChannelHandler處理資料的流程,新增多個ChannelHandler時如何工作。下回合分析。