「java」從websocket伺服器的啟動分析netty3.10原始碼
**
1.首先是建立bootstrap物件
**
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
建立bootstrap物件的時候先建立一個頻道工廠ChannelFactory
,它會初始化boss執行緒和多個worker執行緒。
public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, WorkerPool workerPool) {
this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
}
worker執行緒的初始化方法在NioWorkerPool
的建構函式呼叫
NioServerBossPool的初始化方法會建立NioWorker物件,每個worker物件都會建立一個死鎖檢測worker執行緒,worker物件的數量由使用者定義
NioServerBoss構造方法會呼叫AbstractNioSelector.openSelector(determiner)
方法開啟多路複用器,同時呼叫DeadLockProofWorker.start()建立一個死鎖檢測worker執行緒,在這個執行緒中會通過使用者設定的執行器executor跑NioWorker物件(它是個runable)。
NioServerBossPool的構造方法最後呼叫waitForWorkerThreads()方法等待所有的worker執行緒初始化完畢,這是通過閉鎖來進行判斷的,閉鎖的完成條件是NioWorker物件的父方法run()執行起來後呼叫startupLatch.countDown()方法。
boss執行緒的初始化和worker差不多
**
2.繫結本地地址
**
public Channel bind(final SocketAddress localAddress) {
//bind的非同步操作結果物件(裡面會建立一個新的頻道)
ChannelFuture future = bindAsync(localAddress);
//阻塞到繫結結束 Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}
return future.getChannel();
}
public ChannelFuture bindAsync(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException(“localAddress”);
}
//本質是一個頻道處理器(專用於繫結頻道到指定的本地地址)
Binder binder = new Binder(localAddress);//繫結者
//獲取使用者設定的父頻道處理器
ChannelHandler parentHandler = getParentHandler();
//新建一個預設管道作為boss管道
ChannelPipeline bossPipeline = pipeline();
//新增繫結者到boss管道中
bossPipeline.addLast(“binder”, binder);
//使用者設定了父頻道處理器就加到boss管道中
if (parentHandler != null) {
bossPipeline.addLast(“userHandler”, parentHandler);
}
//通過使用者設定的頻道工廠建立一個持有boss管道的頻道
Channel channel = getFactory().newChannel(bossPipeline);
//結果future
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
//添加回調以得到結果
binder.bindFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bfuture.setSuccess();
} else {
// Call close on bind failure
bfuture.getChannel().close();
bfuture.setFailure(future.getCause());
}
}
});
return bfuture;
}
在通過頻道工廠NioServerSocketChannelFactory建立頻道物件Channel的時候,NioServerSocketChannel的建構函式最後會呼叫fireChannelOpen(this)方法
public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
}
可以看到在這裡管道pipeline的sendUpstream()方法被呼叫,它會讓管道持有的處理器handler物件順著連結串列依次執行handleUpstream()方法。
而Binder是第一個註冊的,所以它當然是第一個執行sendUpstream()方法(這是Binder父類SimpleChannelUpstreamHandler的方法)。最後會呼叫Binder的channelOpen()方法
private final class Binder extends SimpleChannelUpstreamHandler {
private final SocketAddress localAddress;
private final Map<String, Object> childOptions =
new HashMap<String, Object>();
private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
Binder(SocketAddress localAddress) {
this.localAddress = localAddress;
}
@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!"pipelineFactory".equals(e.getKey())) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
//呼叫下一個handler的sendUpstream()方法
ctx.sendUpstream(evt);
}
//呼叫頻道Channel的bind()方法繫結本地地址
evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
}
可以看到方法的結尾會呼叫頻道Channel的bind()方法繫結本地地址。最後他會呼叫ChannelPipelin的sendDownStream()方法。
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
如果有DownstreamHandler,它會讓管道持有的處理器handler物件順著連結串列依次執行handleDownstream()方法。而沒有的時候就是執行ChannelSink的eventSunk()方法。這個sink是ChannelFactory建立channel建立的NioServerSocketPipelineSink。
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
接著呼叫handleServerSocket()方法處理繫結地址事件BOUND
private static void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
((NioServerBoss) channel.boss).close(channel, future);
}
break;
case BOUND:
if (value != null) {
((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
} else {
((NioServerBoss) channel.boss).close(channel, future);
}
break;
default:
break;
}
}
最終呼叫到Boss的bind()方法,註冊一個任務到boss物件的任務佇列。這個任務會放到任務執行緒中進行處理。
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
registerTask(new RegisterTask(channel, future, localAddress));
}
我們來看看這個任務做了什麼
public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}
}
可以看到它是通過nio的SelectableChannel繫結本地埠,有興趣可以去了解下jdk的nio是怎麼實現的。
public void run() {
boolean bound = false;
boolean registered = false;
try {
//通過nio的SelectableChannel繫結本地埠
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
//繫結完本地地址後在發一個BOUND事件給上行處理器
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}
}
至此伺服器基本啟動完成