【6】netty4原始碼分析-accept
轉自 http://xw-z1985.iteye.com/blog/1941800
本文分析服務端如何accept客戶端的connect請求,首先看下selector的I/O多路複用的分發邏輯:
//NioEventLoop private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
當有OP_ACCEPT事件到達時,分發給NioMessageUnsafe的read方法進行處理。
//NioMessageUnsafe public void read() { assert eventLoop().inEventLoop(); final SelectionKey key = selectionKey(); if (!config().isAutoRead()) { int interestOps = key.interestOps(); if ((interestOps & readInterestOp) != 0) { // only remove readInterestOp if needed key.interestOps(interestOps & ~readInterestOp); } } final ChannelConfig config = config(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final boolean autoRead = config.isAutoRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { for (;;) { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } if (readBuf.size() >= maxMessagesPerRead | !autoRead) { break; } } } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); } pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { close(voidPromise()); } } } }
其中doReadMessages方法由NioServerSocketChannel實現:
// NioServerSocketChannel protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
SocketChannel ch = javaChannel().accept()就為接受的客戶端連線建立了一個已連線套接字socketChannel.
buf.add(new NioSocketChannel(this, ch))會構造一個NioSocketChannel,並將其快取到buf中(buf是一個List)。該NioSocketChannel的模式為非阻塞,readInterestOp為SelectionKey.OP_READ,並建立對應的管道和NioByteUnsafe例項。
maxMessagesPerRead表示如果此時有多個connect,那麼只有當SeverSocketChannel建立的已連線套接字個數超過maxMessagesPerRead後,才會對每個已連線套接字觸發channelRead事件。maxMessagesPerRead的預設值是16.
接下來分析channelRead事件做了什麼事情:
channelRead是Inbound事件,會呼叫ServerBootstrapAcceptor的channelRead方法:
// ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child);
} catch (Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
}
首先child.pipeline().addLast(childHandler)將服務端main函式中例項化的ChannelInitializer加入到管道中,該處理器的initChannel方法會在channelRegistered事件觸發時被呼叫
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
然後設定NioSocketchannel的一些屬性,最後進行註冊:childGroup.register(child)。
這裡採用的是childGroup,即worker執行緒池所在的Group,從Group中選擇一個NioEventLoop,並啟動其持有的worker執行緒,執行register0任務。
// AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
}
}
此時worker執行緒就啟動了。Register0任務在connect文章中已經描述,其主要功能就是將socketchannel註冊到selector中;然後觸發channelRegistered事件,呼叫ChannelInitializer的initChannel方法將服務端main函式中設定的處理器(本例為EchoServerHandler)加入到管道中,並將自己ChannelInitializer從管道中移除;最後觸發channelActive事件,將ops設定為read。
// DefaultChannelPipeline
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
到此,worker執行緒對應的selector就開始監聽該socketChannel上的read事件了。
接下來繼續分析boss執行緒的執行:
將本次readBuf中快取的所有NioSocketChannel註冊後,就將他們從readBuf中移除。然後觸發ChannelReadComplete事件,
// DefaultChannelPipeline
public ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete();
if (channel.config().isAutoRead()) {
read();
}
return this;
}
head.fireChannelReadComplete()觸發的是一個inbound事件,沒有做任何事情。接著分析後續觸發的read事件,這是一個outbound事件,也沒有做任何事情(將ops重新設定為OP_ACCEPT,其實本來就是OP_ACCEPT)。
到此,一次accept的流程就執行完了。
總結:
一次accept的流程發生了以下事情:
- 為接受的客戶端連線建立一個已連線套接字,設定為非阻塞。基於已連線套接字例項化一個NioSocketChannel,設定readInterestOp為SelectionKey.OP_READ,為其建立管道,並例項化內部的NioByteUnsafe。
- 在觸發ServerSocketChannel的管道的channelRead方法之前,一個ServerSocketChannel一次可以最多快取maxMessagesPerRead(預設為16)個NioSocketChannel。
- channelRead是一個Inbound事件,做了以下幾件事:呼叫ServerBootstrapAcceptor處理器的channelRead方法為NioSocketChannel的管道加入ChannelInitializer處理器(該處理器的initChannel方法會在channalRegistered事件被觸發時呼叫,將EchoServerHandler加入到管道中);設定NioSocketChannel的屬性;從worker執行緒池中啟動一個worker執行緒,執行register0任務。
- register0任務做的事情是:將socketChannal註冊到selector中,觸發channelRegistered事件,呼叫ChannelInitializer的initChannel方法將main函式中設定的處理器(譬如:EchoServerHandler)加入到管道中,然後觸發channelActive事件,最後裡面觸發read事件,將ops設定為read。到此,worker執行緒所屬的NioEventLoop持有的selector就開始監聽socketChannel的read事件了。
- 最後觸發ChannelReadComplete(inbound)事件,裡面又會觸發read(outbound)事件,這兩個事件均沒有做任何實事。