Netty框架學習之路(六)—— 引導
前言
之前的博文介紹了Channel、EventLoop及由其衍生出來的相關概念。但這些都是靜態的概念,為了能使Netty程式執行起來,必須要有一個全域性的元件將上述內容串聯起來,這便就是本文所講內容——引導。
引導類
引導類的層次結構如下:
主要包括一個抽象父類(AbstractBootstrap)和兩個具體的子類(Bootstrap、ServerBootstrap)。從類名可以看出,Bootstrap主要用於客戶端的引導,只需要一個單獨的、沒有父Channel的Channel來參與網路互動;ServerBootstrap用於服務端、父Channel接收客戶端連線,子Channel處理與客戶端的通訊。
本文以ServerBootstrap為例,介紹Netty如何發揮引導作用的。
ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler (new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture f = b.bind (port).sync();
這是之前Netty示例程式中的一段程式碼,實現的功能就是建立一個ServerBootstrap服務端引導類,初始化並繫結至指定埠,等待客戶端連線請求。
Netty服務端初始化過程在ServerBoostrap的bind(port)方法中,而bind 方法會觸發如下的呼叫鏈:
AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
從方法名稱可以看出,initAndRegister方法包含了最核心的處理邏輯,具體程式碼如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1,使用反射方式,呼叫預設構造方法,建立一個channel
channel = channelFactory.newChannel();
//2,初始化channel
init(channel);
}
……
//3,註冊channel
ChannelFuture regFuture = config().group().register(channel);
……
}
initAndRegister方法通過反射方式建立一個Channel,而Channel的型別是ServerBootstrap的channel()方法所設定的,此處型別為NioServerSocketChannel,之前的博文中提過,通過呼叫 SelectorProvider.openSocketChannel() 來開啟一個新的 Java NIO SocketChannel,同時將這個Channel與selector相關聯並關注OP_ACCEPT事件,同時又建立一個ChannelPipeline與之相關聯。
之後呼叫init方法對Channe進行初始化操作,具體程式碼如下:
//此處省去很多跟主邏輯關係不大的程式碼
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
針對Channel初始化過程中建立的ChannelPipeline新增ChannelHandler,此處新增的ChannelInitializer是一個ChannelHandler的介面卡類,通過此匿名類設定回撥方法等待回撥。之前我們講過Netty採用了事件機制,當Channel成功註冊到一個eventLoop的Selector中, 並且將當前Channel作為attachment時,便產生了一個Inbound事件觸發了ChannelHandlerContext.fireChannelRegistered()方法,那Channel何時註冊到一個eventLoop的Selector呢?我們繼續往下看。
channel初始化完成後,便是註冊處理,之前關於Netty執行緒模型的博文中提到,channel會和一個EventLoop繫結:
ChannelFuture regFuture = config().group().register(channel);
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
group()方法返回的是主EventLoopGroup,上述程式碼的過程就是將之前建立的NioServerSocketChannel註冊到主EventLoopGroup中的某一個EventLoop上。next()方法返回一個EventLoop,沿著呼叫鏈,最後進入到AbstractUnsafe的register0方法,程式碼如下:
private void register0(ChannelPromise promise) {
……
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
……
}
protected void doRegister() throws Exception {
// 省略錯誤處理
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
此處AbstractNioChannel.doRegister 方法將之前建立的NioServerSocketChannel註冊到一個eventLoop的Selector中, 並且將當前 Channel 作為 attachment。
同時我們看到了前文涉及到的fireChannelRegistered方法,以此觸發Channel自身的初始化新增ChannelHandler的過程,具體過程之前的博文已經介紹過,不再贅述。
再回到主流程,當完成對Channel的初始化和註冊工作之後,便開始繫結操作,
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
可以看到此處是非同步呼叫Channel的bind方法實現非同步繫結,之前的博文介紹過,Channel的bind方法會產生一個出站事件,那我們進入到bind方法內部看一下,
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
這部分程式碼看起來好像很熟悉,的確如此,之前博文中有關於Inbound事件觸發InboundChannelHandler初始化,這裡便是Outbound事件觸發OutboundChannelHandler初始化,過程基本相同,不再贅述。
到這裡,Netty服務端程式初始化完畢,等待客戶端連線。當一個客戶端連線到服務端時, Java 底層的 NIOServerSocketChannel 會有產生一個 SelectionKey.OP_ACCEPT 就緒事件, 接著就會呼叫到 NioServerSocketChannel.doReadMessages方法,
protected int doReadMessages(List<Object> buf) throws Exception {
……
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
……
}
在 doReadMessages 中,先獲取客戶端新連線的 SocketChannel,接著例項化一個 NioSocketChannel,接下來就經由 Netty 的 ChannelPipeline 機制,將讀取事件逐級傳送到各個 handler 中,於是就會觸發前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。
總結
Netty的引導過程比較複雜,本文只是大概梳理了一下過程,希望能幫助讀者理解此過程。