Java NIO框架Netty教程(五)
週末是最好的學習時間,不過這週末收房子,可想而知事情自然也不會少。這段時間的週末,可能很少有時間學習了。見縫插針吧。
不說廢話了,好好學習。上回通過程式碼理解了Netty底層資訊的流的傳遞機制,不過只是一個感性上的認識。教會你應該如何使用和使用的時候應該注意的方面。但是有一些細節的問題,並沒有提及。比如在《Java NIO框架Netty教程(四)- ChannelBuffer》的程式碼裡,我們通過:
private void sendMessageByFrame(ChannelStateEvent e) {
String msgOne = "Hello, ";
String msgTwo = "I'm ";
String msgThree = "client.";
e.getChannel().write(tranStr2Buffer(msgOne));
e.getChannel().write(tranStr2Buffer(msgTwo));
e.getChannel().write(tranStr2Buffer(msgThree));
}
這樣的方式,連續返送三次訊息。但是如果你在服務端進行接收計數的話,你會發現,大部分時候都是接收到兩次的事件請求。不過訊息都是完整的。網上也有人提到過,進行10000次的連續放鬆,往往接受到的訊息個數是999X的,總是就是訊息數目上不匹配,這又是為何呢?筆者也只能通過閱讀Netty
起點自然是選擇在e.getChannel().writer()方法上。一路跟蹤首先來到了:AbstractNioWorker.java類
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread(channel );
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.channel;
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
ChannelFuture future = evt.getFuture();
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (localWrittenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
if (buf != null) {
buf.release();
}
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
if (t instanceof IOException) {
open = false;
close(channel, succeededFuture(channel));
}
}
}
channel.inWriteNowLoop = false;
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}
if (iothread) {
fireWriteComplete(channel, writtenBytes);
} else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
這裡,buf.transferTo(ch)就是呼叫底層WritableByteChannel的write方法,把buffer寫到管道里,傳遞過去。通過Debug可以看到,每呼叫一次這個方法,服務端的messageReceived方法就會進入斷點一次。當然這個也只是表相,或者說也是在預料之內的。因為筆者從開始就懷疑是連續寫入過快導致的問題,所以測試過每次write後停頓1秒。再write下一次。結果一切正常。
那麼我們跟到這裡的意義何在呢?筆者的思路是先證明不是在write端出現的寫覆蓋的問題,這樣就可以從read端尋找問題。這裡筆者也在這裡加入了一個計數,測試究竟transferTo了幾次。結果確實是3次。
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
System.out.println(++count)
}
接下來就從接收端找找原因,在NioWorker的read方法,實現如下:
@Override
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
bb.flip();
final ChannelBufferFactory bufferFactory =
channel.getConfig().getBufferFactory();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
recvBufferPool.release(bb);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
} else {
recvBufferPool.release(bb);
}
if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
在這個方法的外層是一個迴圈,不停的遍歷,如果有SelectionKey存在,則進入此方法讀取buffer中的資料。這個SelectionKey區分只是一種型別,這個設計到Java NIO中的Seletor機制,這個筆者準備下講穿插一下。屬於Netty底層的一個重要的機制。messageReceived事件的觸發,是在讀取完當前緩衝池中所有的資訊之後在觸發的。這倒是可以解釋,為什麼即使我們收到事件的次數少,但是訊息是完整的。
從目前來看,Netty通過Java 的NIO機制傳遞資料,資料讀寫跟事件沒有嚴格的繫結機制。資料是以流的形式獨立存在,讀寫都有一個緩衝池。不過,這些還遠未解決筆者的疑惑。筆者決定先了解一下Seletor機制,再回頭來探索這個問題。
待解決……如果您知道,熱切期待您的指導。