死磕Netty原始碼之服務端啟動原始碼解析
前言
本部落格講述的是Netty是如何繫結埠、啟動服務。啟動服務的過程中你將會了解到Netty各大核心元件
服務端啟動DEMO
先從一個簡單的服務端啟動DEMO開始,以下是一個標準的Netyy服務端程式碼
public final class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
ChannelPipeline channelPipeline = channel.pipeline();
channelPipeline.addLast("decoder", new StringDecoder());
channelPipeline.addLast("encoder" , new StringEncoder());
channelPipeline.addLast("handler", new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
注:ServerBootstrap.childHandler()用於指定處理新連線資料的讀寫處理邏輯,同時ServerBootstrap還提供handler()用於指定在服務端啟動過程中的一些邏輯,通常情況下我們用不著這個方法
ServerHandler程式碼如下:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("service receive msg:" + msg);
}
}
當有新連線接入時,控制檯打印出
handlerAdded
channelRegistered
channelActive
但接收到新訊息時,控制檯打印出
service receive msg:xxx
channelReadComplete
本文主要分析服務端的啟動過程,而新連線接入 新訊息的讀取會在後續章節中說明
服務端啟動原始碼分析
ServerBootstrap是Netty為方便開發者使用而設計的一個啟動類,ServerBootstrap的核心程式碼入口在bind(),程式碼如下
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
通過埠號建立一個InetSocketAddress,然後繼續bind
public ChannelFuture bind(SocketAddress localAddress) {
// 校驗引數
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// Channel繫結邏輯
return doBind(localAddress);
}
validate()驗證服務啟動需要的必要引數,然後呼叫doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
//...
final ChannelFuture regFuture = initAndRegister();
//...
final Channel channel = regFuture.channel();
//...
doBind0(regFuture, channel, localAddress, promise);
//...
return promise;
}
在doBind()中我們關注兩個核心方法,initAndRegister()以及doBind0()
initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
// 新建Channel
channel = channelFactory.newChannel();
// 初始化Channel
init(channel);
// 將這個Channel Register到某個物件
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
新建Channel
Channel是Netty的核心概念之一,它是Netty網路通訊的主體由它負責同對端進行網路通訊、註冊和資料操作等功能。Channel的建立是由channelFactory.newChannel()完成的,ChannelFactory介面定義如下
public interface ChannelFactory<T extends Channel> {
T newChannel();
}
接下來我們跟蹤此處的channelFactory是在何時被初始化,我們層層回溯最終發現是在這個函式中
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
在Demo程式呼叫.channel(NioServerSocketChannel.class)方法,所以channelFactory.newChannel()真正執行程式碼如下
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
即在Netty服務端啟動的時候通過反射的方式來建立一個NioServerSocketChannel物件,最終建立Channel相當於呼叫預設建構函式new出一個 NioServerSocketChannel物件,接下來我們繼續跟進NioServerSocketChannel的預設建構函式
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 利用SelectorProvider產生一個ServerSocketChannel物件
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}
通過newSocket(DEFAULT_SELECTOR_PROVIDER)建立一條server端channel,然後進入到以下方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
該方法主要完成兩個功能,首先是呼叫父類的構造方法然後初始化NioServerSocketChannelConfig屬性。我們繼續跟入super()
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 設定SelectionKey.OP_ACCEPT事件
this.readInterestOp = readInterestOp;
// 設定ServerSocketChannel為非阻塞的
ch.configureBlocking(false);
}
這裡將前面通過provider.openServerSocketChannel()創建出來的ServerSocketChannel儲存到成員變數,然後呼叫將該channel為非阻塞模式,這是個標準的JDK NIO程式設計的玩法。這裡的readInterestOp即前面層層傳入的SelectionKey.OP_ACCEPT,接下來繼續跟進super(parent);(這裡的parent其實是null,由前面寫死傳入)
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在AbstractChannel的構造方法中主要是初始化了id,unsafe,pipeline屬性
初始化Channel
在建立完Channel後,我們在init方法中對Channel進行初始化操作,程式碼如下
void init(Channel channel) throws Exception {
// 設定option
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
// 設定attr
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 設定handler到pipeline上
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
// 設定新接入channel的options
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
// 設定新接入channel的attrs
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 這裡的handler()返回的就是.handler()所設定的值
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// p.addLast()向serverChannel的流水線處理器中加入了一個ServerBootstrapAcceptor
// 從名字上就可以看出來這是一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器
pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
以上程式碼主要完成如下功能:先呼叫options0()以及attrs0()獲取到伺服器啟動時設定的一些引數,然後將得到的options和attrs注入到channelConfig或者channel中;然後在當前Channel的pipeline中添加了一個ChannelInitializer,在ChannelInitializer中往pipeline中添加了一個handler,並通過NioEventLoop.execute()方法往pipeline中添加了一個ServerBootstrapAcceptor(請求接入器),此處的NioEventLoop.execute()方法為Netty Reactor執行緒執行的入口,關於Netty Reactor執行緒我們將在下一篇部落格中介紹。我們總結一下發現程式碼執行到這裡Netty並未真正啟動服務,只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器用來專門接受新連線
完成Channel註冊
完成Channel註冊的程式碼如下
ChannelFuture regFuture = config().group().register(channel);
它呼叫到MultithreadEventLoopGroup中的register方法
@Override
public ChannelFuture register(Channel channel) {
// 呼叫了NioEvenLoop物件中的register方法
// EventLoopGroup extends SingleThreadEventLoop
return next().register(channel);
}
在next方法中返回一個EventLoop物件,每一個EventLoop都與一個selector繫結,在之前的程式碼中EventLoop中的Selector一直沒有任何Channel註冊,所以每次select操作都是空,但從這行程式碼開始這個selector中開始有Channel註冊
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
這裡可以看到register操作是委託給Channel中的Unsafe物件來執行的,這裡的Unsafe物件對上文稍有印象的同學應該能知道這個就是建立NioServerSocketChannel的時候建立的Unsafe物件,繼續跟進Unsafe物件的register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
AbstractChannel.this.eventLoop = eventLoop;
// ...
register0(promise);
}
先將EventLoop事件迴圈器繫結到該NioServerSocketChannel上,然後呼叫register0()程式碼如下
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這一段其實也很清晰,先呼叫doRegister()進行註冊
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
// ...
}
}
}
在這裡我們終於看到JDK底層的Channel註冊到Selector的過程,但是這裡的OPS為0即不關心任何事件,而我們期望OPS的值為SelectionKey.OP_ACCEPT,所以到了這裡程式碼還沒有結束。在執行完Channel註冊後接著執行了幾個pipeline相關的方法,我們後面詳細剖析pipeline的時候再講,然後我們回到doBind0方法中
doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在dobind0()方法中通過EventLoop執行一個任務,接下來我們進入到channel.bind()方法
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
關於Pipeline相關的內容將在後續部落格中介紹,當前一個比較好的方式就是Debug單步進入。最後我們來到了如下區域
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
unsafe.bind(localAddress, promise);
}
這裡的unsafe就是前面提到的AbstractUnsafe, 準確點應該是NioMessageUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
boolean wasActive = isActive();
// ...
doBind(localAddress);
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
在doBind方法中完成繫結操作,程式碼如下
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
最終呼叫到了JDK裡面的bind方法真正進行了埠的繫結。按照正常流程我們前面已經分析到isActive()方法返回false,進入到 doBind()之後如果channel被激活了,就發起pipeline.fireChannelActive()呼叫。接著我們跟進pipeline.channelActive方法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
pipeline.channelActive會逐一呼叫pipeline中每一個節點的channelActive方法,並且在HeadContext中呼叫了readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
最終這個方法會呼叫到AbstractNioChannel的doBeginRead方法
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
在最後一行中的readInterestOp即在上文中提到的SelectionKey.OP_ACCEPT,至此完成了Channel對ACCEPT事件的註冊過程
總結
到目前為止我們看到的程式碼相當於傳統NIO程式設計中的如下程式碼
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); => 建立NioServerSocketChannel
serverSocketChannel.configureBlocking(false); => AbstractNioChannel中ch.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888)); => NioServerSocketChannel.doBind()
Selector selector = Selector.open(); => NioEventLoop.openSelector()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) => AbstractNioChannel.doBeginRead()
服務端啟動完成的主要功能為建立一個Channel,並且將Channel註冊到NioEventLoop的Selector上