1. 程式人生 > >「java」從websocket伺服器的啟動分析netty3.10原始碼

「java」從websocket伺服器的啟動分析netty3.10原始碼

**

1.首先是建立bootstrap物件

**
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
建立bootstrap物件的時候先建立一個頻道工廠ChannelFactory,它會初始化boss執行緒和多個worker執行緒。

public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, WorkerPool workerPool) {
this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
}

worker執行緒的初始化方法在NioWorkerPool的建構函式呼叫

NioServerBossPool的初始化方法會建立NioWorker物件,每個worker物件都會建立一個死鎖檢測worker執行緒,worker物件的數量由使用者定義

NioServerBoss構造方法會呼叫AbstractNioSelector.openSelector(determiner)方法開啟多路複用器,同時呼叫DeadLockProofWorker.start()建立一個死鎖檢測worker執行緒,在這個執行緒中會通過使用者設定的執行器executor跑NioWorker物件(它是個runable)。

NioServerBossPool的構造方法最後呼叫waitForWorkerThreads()方法等待所有的worker執行緒初始化完畢,這是通過閉鎖來進行判斷的,閉鎖的完成條件是NioWorker物件的父方法run()執行起來後呼叫startupLatch.countDown()方法。

boss執行緒的初始化和worker差不多

**

2.繫結本地地址

**

public Channel bind(final SocketAddress localAddress) {
//bind的非同步操作結果物件(裡面會建立一個新的頻道)
ChannelFuture future = bindAsync(localAddress);

//阻塞到繫結結束 Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
    future.getChannel().close().awaitUninterruptibly();
    throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}

return future.getChannel();

}

public ChannelFuture bindAsync(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException(“localAddress”);
}
//本質是一個頻道處理器(專用於繫結頻道到指定的本地地址)
Binder binder = new Binder(localAddress);//繫結者
//獲取使用者設定的父頻道處理器
ChannelHandler parentHandler = getParentHandler();
//新建一個預設管道作為boss管道
ChannelPipeline bossPipeline = pipeline();
//新增繫結者到boss管道中
bossPipeline.addLast(“binder”, binder);
//使用者設定了父頻道處理器就加到boss管道中
if (parentHandler != null) {
bossPipeline.addLast(“userHandler”, parentHandler);
}

//通過使用者設定的頻道工廠建立一個持有boss管道的頻道
Channel channel = getFactory().newChannel(bossPipeline);

//結果future
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
//添加回調以得到結果
binder.bindFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            bfuture.setSuccess();
        } else {
            // Call close on bind failure
            bfuture.getChannel().close();
            bfuture.setFailure(future.getCause());
        }
    }
});
return bfuture;

}
在通過頻道工廠NioServerSocketChannelFactory建立頻道物件Channel的時候,NioServerSocketChannel的建構函式最後會呼叫fireChannelOpen(this)方法

public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}

channel.getPipeline().sendUpstream(
        new UpstreamChannelStateEvent(
                channel, ChannelState.OPEN, Boolean.TRUE));

}
可以看到在這裡管道pipeline的sendUpstream()方法被呼叫,它會讓管道持有的處理器handler物件順著連結串列依次執行handleUpstream()方法。

而Binder是第一個註冊的,所以它當然是第一個執行sendUpstream()方法(這是Binder父類SimpleChannelUpstreamHandler的方法)。最後會呼叫Binder的channelOpen()方法

private final class Binder extends SimpleChannelUpstreamHandler {

private final SocketAddress localAddress;
private final Map<String, Object> childOptions =
    new HashMap<String, Object>();
private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
Binder(SocketAddress localAddress) {
    this.localAddress = localAddress;
}

@Override
public void channelOpen(
        ChannelHandlerContext ctx,
        ChannelStateEvent evt) {

    try {
        evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

        // Split options into two categories: parent and child.
        Map<String, Object> allOptions = getOptions();
        Map<String, Object> parentOptions = new HashMap<String, Object>();
        for (Entry<String, Object> e: allOptions.entrySet()) {
            if (e.getKey().startsWith("child.")) {
                childOptions.put(
                        e.getKey().substring(6),
                        e.getValue());
            } else if (!"pipelineFactory".equals(e.getKey())) {
                parentOptions.put(e.getKey(), e.getValue());
            }
        }

        // Apply parent options.
        evt.getChannel().getConfig().setOptions(parentOptions);
    } finally {
        //呼叫下一個handler的sendUpstream()方法
        ctx.sendUpstream(evt);
    }
    //呼叫頻道Channel的bind()方法繫結本地地址
    evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                bindFuture.setSuccess();
            } else {
                bindFuture.setFailure(future.getCause());
            }
        }
    });
}

可以看到方法的結尾會呼叫頻道Channel的bind()方法繫結本地地址。最後他會呼叫ChannelPipelin的sendDownStream()方法。

public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}

sendDownstream(tail, e);

}

如果有DownstreamHandler,它會讓管道持有的處理器handler物件順著連結串列依次執行handleDownstream()方法。而沒有的時候就是執行ChannelSink的eventSunk()方法。這個sink是ChannelFactory建立channel建立的NioServerSocketPipelineSink。

public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}

接著呼叫handleServerSocket()方法處理繫結地址事件BOUND

private static void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}

ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
    (NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();

switch (state) {
case OPEN:
    if (Boolean.FALSE.equals(value)) {
        ((NioServerBoss) channel.boss).close(channel, future);
    }
    break;
case BOUND:
    if (value != null) {
        ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
    } else {
        ((NioServerBoss) channel.boss).close(channel, future);
    }
    break;
default:
    break;
}

}
最終呼叫到Boss的bind()方法,註冊一個任務到boss物件的任務佇列。這個任務會放到任務執行緒中進行處理。
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
registerTask(new RegisterTask(channel, future, localAddress));
}
我們來看看這個任務做了什麼

public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;

    future.setSuccess();
    fireChannelBound(channel, channel.getLocalAddress());
    channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

    registered = true;
} catch (Throwable t) {
    future.setFailure(t);
    fireExceptionCaught(channel, t);
} finally {
    if (!registered && bound) {
        close(channel, future);
    }
}

}
可以看到它是通過nio的SelectableChannel繫結本地埠,有興趣可以去了解下jdk的nio是怎麼實現的。

public void run() {
boolean bound = false;
boolean registered = false;
try {
//通過nio的SelectableChannel繫結本地埠
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;

    future.setSuccess();
    //繫結完本地地址後在發一個BOUND事件給上行處理器
    fireChannelBound(channel, channel.getLocalAddress());
    channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

    registered = true;
} catch (Throwable t) {
    future.setFailure(t);
    fireExceptionCaught(channel, t);
} finally {
    if (!registered && bound) {
        close(channel, future);
    }
}

}
至此伺服器基本啟動完成