Netty原始碼解讀------------客戶端接入繫結(二)
阿新 • • 發佈:2019-02-13
下面接著看下面這段程式碼
//Binder裡面的程式碼
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child." )) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel ().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
}
//evt.getChannel().bind()這裡開始呼叫繫結本地介面, 進入裡面看下面的情況
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished;
}
public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelFuture future = future(channel);
//像pipeline傳送一個bound訊息, 裡面儲存本地地址, 接著看裡面的寫法。
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.BOUND, localAddress));
return future;
}
//DefaultPipeLine.java
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
//getSink() 這個sink是在channel裡面放入的NioServerSocketPipelineSink, 裡面還放了NioWorker,嘿嘿
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
//NioServerSocketPipelineSink.java 進入這個裡面
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
//進入這個方法裡面, 進行介面的繫結
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
}
}
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
//socket繫結bind
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
//觸發bound事件
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
//它的作用是將bossExecutor 和Boss繫結在一起。
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
//主要看new Boss()這個裡面的實現
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
//Boss.java
Boss(NioServerSocketChannel channel) throws IOException {
//channel是netty封裝的一成NioServerChannel
this.channel = channel;
//開啟一個選擇,nio裡面的東西。 選擇器
selector = Selector.open();
//標誌
boolean registered = false;
try {
// channel.socket 是nio裡面的NioServerChannel裡面的東西, register註冊accept事件, 一有客戶連線上來進去在selector能找到。
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
//嘿嘿, 記住這裡把selector放進去了。
channel.selector = selector;
}
//上面的DeadLockProofWorker.start()有啟動這個執行緒
public void run() {
final Thread currentThread = Thread.currentThread();
//這裡使用鎖, 來防止高併發
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
//接收客戶端socket, 當客戶端有連入時,這裡會記錄
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
//註冊接入進來的客戶端 registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
//channel.getConfig()放入的 是我們自己重新的方法.這一段,所以裡面的getPipeline()就把我們直接寫的handler嵌入進來了。
/* bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new StringDecoder(), new StringEncoder(), new ServerHandler());
}
});*/ channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
//從這裡進入
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
try {
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
//NioWorker.java
void register(NioSocketChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
//channel是和registerTask關聯的, 沒一個客戶端連線都會繫結一個執行緒。
Runnable registerTask = new RegisterTask(channel, future, server);
//臨時選擇器
Selector selector;
synchronized (startStopLock) {
//注意這裡, 這裡只會執行一次if裡面的程式碼
if (!started) {
// Open a selector if this worker didn't start yet.
try {
//選擇器
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
DeadLockProofWorker.start(
executor, new ThreadRenamingRunnable(this, threadName));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
//把註冊的任務放入到 registerTaskQueue裡面
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
今天就暫時說到這裡, 有什麼不明白的。可以問我。