1. 程式人生 > >Java NIO框架Netty教程(五)

Java NIO框架Netty教程(五)

週末是最好的學習時間,不過這週末收房子,可想而知事情自然也不會少。這段時間的週末,可能很少有時間學習了。見縫插針吧。

不說廢話了,好好學習。上回通過程式碼理解了Netty底層資訊的流的傳遞機制,不過只是一個感性上的認識。教會你應該如何使用和使用的時候應該注意的方面。但是有一些細節的問題,並沒有提及。比如在《Java NIO框架Netty教程(四)- ChannelBuffer》的程式碼裡,我們通過:

private void sendMessageByFrame(ChannelStateEvent e) {
		String msgOne = "Hello, ";
		String msgTwo
= "I'm "; String msgThree = "client."; e.getChannel().write(tranStr2Buffer(msgOne)); e.getChannel().write(tranStr2Buffer(msgTwo)); e.getChannel().write(tranStr2Buffer(msgThree)); }

這樣的方式,連續返送三次訊息。但是如果你在服務端進行接收計數的話,你會發現,大部分時候都是接收到兩次的事件請求。不過訊息都是完整的。網上也有人提到過,進行10000次的連續放鬆,往往接受到的訊息個數是999X的,總是就是訊息數目上不匹配,這又是為何呢?筆者也只能通過閱讀Netty

的原始碼來找原因,我們一起來慢慢分析吧。

起點自然是選擇在e.getChannel().writer()方法上。一路跟蹤首先來到了:AbstractNioWorker.java

protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;
        boolean addOpWrite = false;
        boolean removeOpWrite = false;
        boolean iothread = isIoThread(channel
); long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf; if (evt == null) { if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; channel.writeSuspended = false; break; } channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); } else { buf = channel.currentWriteBuffer; } ChannelFuture future = evt.getFuture(); try { long localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i --) { localWrittenBytes = buf.transferTo(ch); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } if (buf.finished()) { break; } } if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; evt = null; buf = null; future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; channel.writeSuspended = true; if (localWrittenBytes > 0) { // Notify progress listeners if necessary. future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } break; } } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { if (buf != null) { buf.release(); } channel.currentWriteEvent = null; channel.currentWriteBuffer = null; buf = null; evt = null; future.setFailure(t); if (iothread) { fireExceptionCaught(channel, t); } else { fireExceptionCaughtLater(channel, t); } if (t instanceof IOException) { open = false; close(channel, succeededFuture(channel)); } } } channel.inWriteNowLoop = false; // Initially, the following block was executed after releasing // the writeLock, but there was a race condition, and it has to be // executed before releasing the writeLock: // // https://issues.jboss.org/browse/NETTY-410 // if (open) { if (addOpWrite) { setOpWrite(channel); } else if (removeOpWrite) { clearOpWrite(channel); } } } if (iothread) { fireWriteComplete(channel, writtenBytes); } else { fireWriteCompleteLater(channel, writtenBytes); } }

這裡,buf.transferTo(ch)就是呼叫底層WritableByteChannelwrite方法,把buffer寫到管道里,傳遞過去。通過Debug可以看到,每呼叫一次這個方法,服務端的messageReceived方法就會進入斷點一次。當然這個也只是表相,或者說也是在預料之內的。因為筆者從開始就懷疑是連續寫入過快導致的問題,所以測試過每次write後停頓1秒。再write下一次。結果一切正常。

那麼我們跟到這裡的意義何在呢?筆者的思路是先證明不是在write端出現的寫覆蓋的問題,這樣就可以從read端尋找問題。這裡筆者也在這裡加入了一個計數,測試究竟transferTo了幾次。結果確實是3次。

for (int i = writeSpinCount; i > 0; i --) {
       localWrittenBytes = buf.transferTo(ch);
       System.out.println(++count)
  }

接下來就從接收端找找原因,在NioWorkerread方法,實現如下:

@Override
    protected boolean read(SelectionKey k) {
        final SocketChannel ch = (SocketChannel) k.channel();
        final NioSocketChannel channel = (NioSocketChannel) k.attachment();

        final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

        int ret = 0;
        int readBytes = 0;
        boolean failure = true;

        ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
        try {
            while ((ret = ch.read(bb)) > 0) {
                readBytes += ret;
                if (!bb.hasRemaining()) {
                    break;
                }
            }
            failure = false;
        } catch (ClosedChannelException e) {
            // Can happen, and does not need a user attention.
        } catch (Throwable t) {
            fireExceptionCaught(channel, t);
        }

        if (readBytes > 0) {
            bb.flip();

            final ChannelBufferFactory bufferFactory =
                channel.getConfig().getBufferFactory();
            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
            buffer.setBytes(0, bb);
            buffer.writerIndex(readBytes);

            recvBufferPool.release(bb);

            // Update the predictor.
            predictor.previousReceiveBufferSize(readBytes);

            // Fire the event.
            fireMessageReceived(channel, buffer);
        } else {
            recvBufferPool.release(bb);
        }

        if (ret < 0 || failure) {
            k.cancel(); // Some JDK implementations run into an infinite loop without this.
            close(channel, succeededFuture(channel));
            return false;
        }

        return true;
    }

在這個方法的外層是一個迴圈,不停的遍歷,如果有SelectionKey存在,則進入此方法讀取buffer中的資料。這個SelectionKey區分只是一種型別,這個設計到Java NIO中的Seletor機制,這個筆者準備下講穿插一下。屬於Netty底層的一個重要的機制。messageReceived事件的觸發,是在讀取完當前緩衝池中所有的資訊之後在觸發的。這倒是可以解釋,為什麼即使我們收到事件的次數少,但是訊息是完整的。

從目前來看,Netty通過Java 的NIO機制傳遞資料,資料讀寫跟事件沒有嚴格的繫結機制。資料是以流的形式獨立存在,讀寫都有一個緩衝池。不過,這些還遠未解決筆者的疑惑。筆者決定先了解一下Seletor機制,再回頭來探索這個問題。

待解決……如果您知道,熱切期待您的指導。