Netty原始碼分析:服務端啟動全過程(篇幅很長)
Netty原始碼分析:服務端啟動全過程
先說結論,Netty 服務端啟動和互動的邏輯的底層實現是藉助於Java NIO ServerSocketChannel來實現,Java NIO ServerSocketChannel作為服務端的繫結埠、接受客戶端的連線的樣式程式碼如下:
/*
* 既然是伺服器端,肯定需要一個ServerSocketChannel來監聽新進來的TCP連線。
* */
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//監聽指定的埠號
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
//檢測是否有客戶端連線進來
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
//do something....
}
//在使用完畢後,會進行關閉
serverSocketChannel.close();
結論已經說完了,或許看完這篇博文,你才會明白這個結論哈,不急,慢慢看。
一般Netty服務端的程式碼如下所示:
public final class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}
在前面兩篇博文中從原始碼的角度分析瞭如下幾行程式碼主要做了哪些工作。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
本篇博文將從原始碼的角度分析ChannelFuture f = b.bind(8888).sync()
的內部實現。
這樣就完成了Netty伺服器端啟動過程的原始碼分析。
原始碼分析ChannelFuture f = b.bind(8888).sync()
通過b.bind(8888)來啟動服務,下面將來進行分析。
AbstractBootstrap.java
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
其中,new InetSocketAddress(inetPort)物件如下所示:
這個函式沒什麼好說的,呼叫了引數為InetSocketAddress的bind方法,該方法程式碼如下。
public ChannelFuture bind(SocketAddress localAddress) {
validate();//相關引數的檢查
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);//下面將分析
}
該函式主要看兩點:validate()和doBind(localAddress)
1、先看validate()方法
//函式功能:檢查相關引數是否設定了
@SuppressWarnings("unchecked")
public B validate() {
if (group == null) {//這裡的group指的是:b.group(bossGroup, workerGroup)程式碼中的bossGroup
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}
該方法主要檢查了兩個引數,一個是group,一個是channelFactory,在這裡可以想一想這兩個引數是在哪裡以及何時被賦值的?答案是在如下程式碼塊中被賦值的,其中是將bossGroup賦值給了group,將BootstrapChannelFactory賦值給了channelFactory.
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
2、接下來看bind方法中的doBind(localAddress)方法
doBind方法的原始碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1
final Channel channel = regFuture.channel();//2
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
doBind這個函式是我們要分析的重點,這個函式的主要工作有如下幾點:
1、通過initAndRegister()方法得到一個ChannelFuture的例項regFuture。
2、通過regFuture.cause()方法判斷是否在執行initAndRegister方法時產生來異常。如果產生來異常,則直接返回,如果沒有產生異常則進行第3步。
3、通過regFuture.isDone()來判斷initAndRegister方法是否執行完畢,如果執行完畢來返回true,然後呼叫doBind0進行socket繫結。如果沒有執行完畢則返回false進行第4步。
4、regFuture會新增一個ChannelFutureListener監聽,當initAndRegister執行完成時,呼叫operationComplete方法並執行doBind0進行socket繫結。
第3、4點想幹的事就是一個:呼叫doBind0方法進行socket繫結。
下面將分成4部分對每行程式碼具體做了哪些工作進行詳細分析。
第1部分: final ChannelFuture regFuture = initAndRegister()
下面將先分析這行程式碼做了些什麼,該方法的具體程式碼如下:
final ChannelFuture initAndRegister() {
//結論:這裡的channel為一個NioServerSocketChannel物件,具體分析見後面
final Channel channel = channelFactory().newChannel();//1
try {
init(channel);//2
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);//3
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
通過函式名以及內部呼叫的函式可以猜測該函式幹了兩件事情:
1、初始化一個Channel,要想初始化,肯定要先得到一個Channel,是吧。
final Channel channel = channelFactory().newChannel();//1
init(channel);//2
2、將Channel進行註冊,至於註冊到哪裡,目前不知道。
ChannelFuture regFuture = group().register(channel);//3
下面我們將分析這幾行程式碼內部幹來些什麼。
1.1部分 final Channel channel = channelFactory().newChannel();
在博文Netty原始碼分析:ServerBootstrap分析中,我們知道b.channel(NioServerSocketChannel.class)
的功能為:設定父類屬性channelFactory 為: BootstrapChannelFactory類的物件。其中這裡BootstrapChannelFactory物件中包括一個clazz屬性為:NioServerSocketChannel.class
因此,final Channel channel = channelFactory().newChannel();就是呼叫的BootstrapChannelFactory類中的newChannel()方法,該方法的具體內容為:
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
看到這個類,我們可以得到的結論:final Channel channel = channelFactory().newChannel();
這行程式碼的作用為通過反射產生來一個NioServerSocketChannel類的例項。
下面將看下NioServerSocketChannel類的建構函式做了哪些工作。
NioServerSocketChannel類的繼承體系結構如下:
其無參建構函式如下:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
無參建構函式中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
。
函式newSocket的功能為:利用SelectorProvider產生一個SocketChannelImpl物件。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
無參建構函式通過newSocket函式產生了一個SocketChannelImpl物件,然後呼叫瞭如下建構函式,我們繼續看
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//父類AbstractNioMessageChannel的建構函式
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
//父類 AbstractNioChannel的建構函式
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
try {
ch.configureBlocking(false);//設定當前的ServerSocketChannel為非阻塞的
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//父類AbstractChannel的建構函式
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
new NioServerSocketChannel()產生一個例項物件時,呼叫上面這麼多建構函式主要乾了兩件事情:
1、產生來一個SocketChannelImpl類的例項,並設定為非阻塞的。
2、設定了config屬性
config = new NioServerSocketChannelConfig(this, javaChannel().socket()
3、設定SelectionKey.OP_ACCEPT事件
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
4、設定unsafe屬性
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
主要作用為:用來負責底層的connect、register、read和write等操作。
5、設定pipeline屬性
pipeline = new DefaultChannelPipeline(this);
每個Channel都有自己的pipeline,當有請求事件發生時,pipeline負責呼叫相應的hander進行處理。
這些屬性在後面都會用到,至於NioServerSocketChannel 物件中的unsafe、pipeline屬性的具體實現後面進行分析。
結論:final Channel channel = channelFactory().newChannel();
這行程式碼的作用為通過反射產生來一個NioServerSocketChannel類的例項,其中這個NioServerSocketChannel類物件有這樣幾個屬性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline
1.2 init(channel)
init方法的具體程式碼如下:
@Override
void init(Channel channel) throws Exception {
//1、設定新接入channel的option
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);//NioServerSocketChannelConfig
}
//2、設定新接入channel的attr
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//3、設定handler到pipeline上
ChannelPipeline p = channel.pipeline();
if (handler() != null) {//這裡的handler()返回的就是第二部分.handler(new SimpleServerHandler())所設定的SimpleServerHandler
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
/*
p.addLast()向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor,
從名字上就可以看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器
*/
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
該函式的功能為:
1、設定channel的options
如果沒有設定,則options為空,該屬性在ServerBootstrap類中的定義如下
Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
options可能如下:
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == MAX_MESSAGES_PER_READ) {
setMaxMessagesPerRead((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
setAllocator((ByteBufAllocator) value);
} else if (option == RCVBUF_ALLOCATOR) {
setRecvByteBufAllocator((RecvByteBufAllocator) value);
} else if (option == AUTO_READ) {
setAutoRead((Boolean) value);
} else if (option == AUTO_CLOSE) {
setAutoClose((Boolean) value);
} else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
setWriteBufferHighWaterMark((Integer) value);
} else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
setWriteBufferLowWaterMark((Integer) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else {
return false;
}
return true;
}
2、設定channel的attrs
如果沒有設定,則attrs為空,該屬性在ServerBootstrap類中的定義如下
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
3、設定handler到channel的pipeline上
其中,這裡的handler為:在博文Netty原始碼分析:ServerBootstrap分析中分析的通過b.handler(new SimpleServerHandler())
所設定的SimpleServerHandler物件
4、在pipeline上新增來一個ChannelInitializer物件,其中重寫來initChannel方法。該方法通過p.addLast()向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor,
從名字上就可以看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器
看到這裡,我們發現其實init只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器,用來專門接受新連線,並沒有啟動服務.
1.3 ChannelFuture regFuture = group().register(channel)
前面的分析我們知道group為:NioEvenLoopGroup,其繼承MultithreadEventLoopGroup,該類中的register方法如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);//呼叫了NioEvenLoop物件中的register方法,NioEventLoop extends SingleThreadEventLoop
}
next()方法的程式碼如下,其功能為選擇下一個NioEventLoop物件。
@Override
public EventExecutor next() {
return chooser.next();//呼叫MultithreadEventExecutorGroup中的next方法
}
在博文Netty原始碼分析:NioEventLoopGroup分析的分析中,我們在MultithreadEventExecutorGroup類的建構函式中看到:根據執行緒個數nThreads是否為2的冪次方來選擇chooser,其中這兩個chooser為: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser
這兩個chooser功能都是一樣,這是求餘的方式不一樣。
next方法返回的是一個NioEvenLoop物件,至於children何時被初始化的,是在MultithreadEventExecutorGroup的建構函式中被初始化的,即 執行EventLoopGroup bossGroup = new NioEventLoopGroup(1);
時被初始化的, 具體可以看博文Netty原始碼分析:NioEventLoopGroup分析的分析。
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];//利用2的N次方法的特點,使用&求餘更快。
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
結論:由於NioEventLoopGroup中維護著多個NioEventLoop,next方法回撥用chooser策略找到下一個NioEventLoop,並執行該物件的register方法進行註冊。
由於NioEventLoop extends SingleThreadEventLoop,NioEventLoop沒有重寫該方法,因此看 SingleThreadEventLoop類中的register方法
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
在本博文第1部分的NioServerSocketChannel例項化中設定來unsafe屬性,具體是呼叫如下的方法來設定的,因此這裡的channel.unsafe()
就是NioMessageUnsafe例項。
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
NioMessageUnsafe沒有重寫register方法,NioMessageUnsafe extends AbstractNioUnsafe,AbstractNioUnsafe extends AbstractUnsafe
因此,channel.unsafe().register(this, promise)這行程式碼呼叫的是AbstractUnsafe類中的register方法,具體程式碼如下:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//判斷該channel是否已經被註冊到EventLoop中
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//1 將eventLoop設定在NioServerSocketChannel上
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {//判斷當前執行緒是否為該EventLoop中擁有的執行緒,如果是,則直接註冊,如果不是,則新增一個任務到該執行緒中
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() { //重點
@Override
public void run() {
register0(promise);//分析
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
上面的重點是register0(promise)方法。基本邏輯為:
1、通過呼叫eventLoop.inEventLoop()方法判斷當前執行緒是否為該EventLoop中擁有的執行緒,如果是,則直接註冊,如果不是,說明該EventLoop在等待並沒有執行權,則進行第二步。
AbstractEventExecutor.java
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
SingleThreadEventExecutor.java
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
2、既然該EventLoop中的執行緒此時沒有執行權,但是我們可以提交一個任務到該執行緒中,等該EventLoop的執行緒有執行權的時候就自然而然的會執行此任務,而該任務負責呼叫register0方法,這樣也就達到了呼叫register0方法的目的。具體為:任務OneTimeTask子類被提交到NioEventLoop執行緒中執行,然後呼叫此任務的run方法,進而呼叫register0方法,其中promise = new DefaultChannelPromise(channel, this)。
下面看register0這個方法,具體程式碼如下:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();//執行完,控制檯輸出:channelRegistered
if (isActive()) { //分析
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
在上面的程式碼中,是通過呼叫doRegister()方法完成NioServerSocketChannel的註冊,該方法的具體程式碼如下:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
protected SelectableChannel javaChannel() {
return ch;
}
在本博文的第1部分的NioServerSocketChannel的例項化分析中,我們知道這裡的javaChannel()方法返回的ch為例項化NioServerSocketChannel時產生的一個SocketChannelImpl類的例項,並設定為非阻塞的,具體見本博文的第1部分。
selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel註冊到Selector中。
回顧下,這裡的eventLoop().selector是什麼?答案是:KQueueSelectorImpl物件。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//...省略了一部分程式碼
return selector;
}
通過回溯NioEventLoop到NioEventLoopGroup建構函式中,我們知道這裡的provider是通過呼叫SelectorProvider.provider()產生的new KQueueSelectorProvider()
物件,該類KQueueSelectorProvider中的openSelector()方法的程式碼如下:
public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this);
}
ServerSocketChannel註冊完之後,接著執行pipeline.fireChannelRegistered方法。
@Override
public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);//呼叫我們自己寫的Handler然後通過.handler設定後Handler
} catch (Throwable t) {
notifyHandlerException(t);
}
}
pipeline中維護了handler連結串列,還記得之前.handler(new SimpleServerHandler())
初始化的handler在本博文的第1.2部分的分析中介紹了此handler被新增到此pipeline中了,通過遍歷連結串列,執行InBound型別handler的channelRegistered方法,最終執行init中新增的ChannelInitializer handler。
因此執行到這裡,我們的控制檯就回輸出:channelRegistered,這行資訊。
到這裡,我們就將doBind方法final ChannelFuture regFuture = initAndRegister();
給分析完了,得到的結論如下:
1、通過反射產生了一個NioServerSocketChannle物件。
2、完成了初始化
3、將NioServerSocketChannel進行了註冊。
接下來我們分析doBind方法的剩餘部分程式碼主要做了什麼,
原始碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1
final Channel channel = regFuture.channel();//2
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
第二部分:doBind0(regFuture, channel, localAddress, promise);
其中regFuture為:DefaultChannelPromise,channel:NioServerSocketChannel
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
該函式主要是提交了一個Runnable任務到NioEventLoop執行緒中來進行處理。,這裡先看一下NioEventLoop類的execute方法
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();//判斷當前執行緒是否為該NioEventLoop所關聯的執行緒,如果是,則新增任務到任務佇列中,如果不是,則先啟動執行緒,然後新增任務到任務佇列中去
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
//如果
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
當提交的任務被執行緒執行後,則會執行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)這行程式碼,這行程式碼完成的功能為:實現channel與埠的繫結。
具體如下:
AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
在該方法中直接呼叫了pipeline的bind方法,這裡的pipeline時DefaultChannelPipeline的例項。
DefaultChannelPipeline.java
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
在上面方法中直接呼叫了TailContext例項tail的bind方法,tail在博文 Netty原始碼分析:ChannelPipeline中有詳細的介紹。
繼續看tail例項的bind方法
AbstractChannelHandlerContext.java
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//...省略有效性檢查
final AbstractChannelHandlerContext next = findContextOutbound();//
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
此上面bind函式中的這行程式碼:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任務就是在pipeline所持有的以AbstractChannelHandlerContext為節點的雙向連結串列中從尾節點tail開始向前尋找第一個outbound=true的handler節點。
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
閱讀博文 Netty原始碼分析:ChannelPipeline中我們知道:在 DefaultChannelPipeline 的構造器中, 會例項化兩個物件: head 和 tail, 並形成了雙向連結串列的頭和尾。 head 是 HeadContext 的例項, 它實現了 ChannelOutboundHandler 介面, 並且它的 outbound 欄位為 true.而tail 是 TailContext 的例項,它實現了ChannelInboundHandler 介面,並且其outbound 欄位為 false,inbound 欄位為true。 基於此在如上的bind函式中呼叫了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 物件其實就是 head.
繼續看,在pipelie的雙向連結串列中找到第一個outbound=true的AbstractChannelHandlerContext節點head後,然後呼叫此節點的invokeConnect方法,該方法的程式碼如下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
HeadContext類中的handler()方法程式碼如下:
@Override
public ChannelHandler handler() {
return this;
}
該方法返回的是其本身,這是因為HeadContext由於其繼承AbstractChannelHandlerContext以及實現了ChannelHandler介面使其具有Context和Handler雙重特性。
繼續看,看HeadContext類中的bind方法,程式碼如下:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe這個欄位是在HeadContext建構函式中被初始化的,如下:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
}
而此建構函式中的pipeline.channel().unsafe()這行程式碼返回的就是在本博文前面研究NioServerSocketChannel這個類的建構函式中所初始化的一個例項,如下: