1. 程式人生 > >【8】netty4原始碼分析-flush

【8】netty4原始碼分析-flush

轉自 http://xw-z1985.iteye.com/blog/1971904

Netty的寫操作由兩個步驟組成:

Write:將msg儲存到ChannelOutboundBuffer中
Flush:將msg從ChannelOutboundBuffer中flush到套接字的傳送緩衝區中。
上一篇文章分析了write,本文接著分析第二步flush:

//DefaultChannelHandlerContext  
public ChannelHandlerContext flush() {  
        final DefaultChannelHandlerContext next = findContextOutbound();  
        EventExecutor executor = next.executor();  
        if (executor.inEventLoop()) {  
            next.invokeFlush();  
        } else {  
            Runnable task = next.invokeFlushTask;  
            if (task == null) {  
                next.invokeFlushTask = task = new Runnable() {  
                    @Override  
                    public void run() {  
                        next.invokeFlush();  
                    }  
                };  
            }  
            executor.execute(task);  
        }  
  
        return this;  
    }  
  
    private void invokeFlush() {  
        try {  
            ((ChannelOutboundHandler) handler).flush(this);  
        } catch (Throwable t) {  
            notifyHandlerException(t);  
        }  
    }  

由於flush是Outbound事件,所以會呼叫headHandler的flush方法

//HeadHandler          
public void flush(ChannelHandlerContext ctx) throws Exception {  
            unsafe.flush();  
        }  

HeadHandler呼叫abstractUnsafe的flush方法

//AbstractUnsafe  
public void flush() {  
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;  
            if (outboundBuffer == null) {  
                return;  
            }  
            outboundBuffer.addFlush();  
            flush0();  
        }  

outboundBuffer 是之前第一步中msg所儲存的地方,通過呼叫outboundBuffer.addFlush(),將outboundBuffer 的unflushed置為tail,這樣本次等待flush的msg在buffer陣列中的位置區間就為[flushed, unflushed)。

//ChannelOutboundBuffer  
void addFlush() {  
        unflushed = tail;   
    } 

接下來的flush0方法將這個區間的msg寫到套接字的傳送緩衝區中。

//ChannelOutboundBuffer  
        protected void flush0() {  
            if (inFlush0) {  
                // Avoid re-entrance  
                return;  
            }  
  
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;  
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {  
                return;  
            }  
  
            inFlush0 = true;  
  
            // Mark all pending write requests as failure if the channel is inactive.  
            if (!isActive()) {  
                try {  
                    if (isOpen()) {  
                        outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);  
                    } else {  
                        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);  
                    }  
                } finally {  
                    inFlush0 = false;  
                }  
                return;  
            }  
  
            try {  
                doWrite(outboundBuffer);  
            } catch (Throwable t) {  
                outboundBuffer.failFlushed(t);  
                if (t instanceof IOException) {  
                    close(voidPromise());  
                }  
            } finally {  
                inFlush0 = false;  
            }  
        }  

主要邏輯在doWrite方法裡

//NioSocketChannel  
 protected void doWrite(ChannelOutboundBuffer in) throws Exception {  
        for (;;) {  
            // Do non-gathering write for a single buffer case.  
            final int msgCount = in.size();  
            if (msgCount <= 1) {  
                super.doWrite(in);  
                return;  
            }  
  
            // Ensure the pending writes are made of ByteBufs only.  
            ByteBuffer[] nioBuffers = in.nioBuffers();  
            if (nioBuffers == null) {  
                super.doWrite(in);  
                return;  
            }  
  
            int nioBufferCnt = in.nioBufferCount();  
            long expectedWrittenBytes = in.nioBufferSize();  
  
            final SocketChannel ch = javaChannel();  
            long writtenBytes = 0;  
            boolean done = false;  
            boolean setOpWrite = false;  
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {  
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);  
                if (localWrittenBytes == 0) {  
                    setOpWrite = true;  
                    break;  
                }  
                expectedWrittenBytes -= localWrittenBytes;  
                writtenBytes += localWrittenBytes;  
                if (expectedWrittenBytes == 0) {  
                    done = true;  
                    break;  
                }  
            }  
  
            if (done) {  
                // Release all buffers  
                for (int i = msgCount; i > 0; i --) {  
                    in.remove();  
                }  
  
                // Finish the write loop if no new messages were flushed by in.remove().  
                if (in.isEmpty()) {  
                    clearOpWrite();  
                    break;  
                }  
            } else {  
                // Did not write all buffers completely.  
                // Release the fully written buffers and update the indexes of the partially written buffer.  
  
                for (int i = msgCount; i > 0; i --) {  
                    final ByteBuf buf = (ByteBuf) in.current();  
                    final int readerIndex = buf.readerIndex();  
                    final int readableBytes = buf.writerIndex() - readerIndex;  
  
                    if (readableBytes < writtenBytes) {  
                        in.progress(readableBytes);  
                        in.remove();  
                        writtenBytes -= readableBytes;  
                    } else if (readableBytes > writtenBytes) {  
                        buf.readerIndex(readerIndex + (int) writtenBytes);  
                        in.progress(writtenBytes);  
                        break;  
                    } else { // readableBytes == writtenBytes  
                        in.progress(readableBytes);  
                        in.remove();  
                        break;  
                    }  
                }  
  
                incompleteWrite(setOpWrite);  
                break;  
            }  
        }  
    }  

邏輯如下

  1. 如果ChannelOutboundBuffer的size<=1,即其中儲存的待發送的msg只佔用buffer陣列中的一個entry(buffer是一個Entry[]陣列,參見上一篇文章),則不需要採用gathering write的方式,可以直接呼叫父類AbstractNioByteChannel的doWrite方法。
  2. 如果size>1,即其中儲存的待發送的msg佔用buffer陣列中的至少兩個entry,則通過呼叫in.nioBuffers()方法對ChannelOutboundBuffer的Entry[]陣列變數buffer進行轉換:將每個Entry元素中儲存的msg由io.netty.buffer.ByteBuf型別轉換為java.nio.ByteBuffer型別。最終得到ByteBuffer[]陣列,並賦給變數buffers。
  3. 如果轉換後得到的陣列為空,即msg不是ByteBuf型別,則也不需要採用gathering write的方式,可以直接呼叫父類AbstractNioByteChannel的doWrite方法。
  4. 否則,執行gathering write方法。
    a) 首先分析super.doWrite(in)方法,即父類AbstractNioByteChannel的doWrite方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception {  
        int writeSpinCount = -1;  
  
        for (;;) {  
            Object msg = in.current();  
            if (msg == null) {  
                // Wrote all messages.  
                clearOpWrite();  
                break;  
            }  
  
            if (msg instanceof ByteBuf) {  
                ByteBuf buf = (ByteBuf) msg;  
                int readableBytes = buf.readableBytes();  
                if (readableBytes == 0) {  
                    in.remove();  
                    continue;  
                }  
  
                if (!buf.isDirect()) {  
                    ByteBufAllocator alloc = alloc();  
                    if (alloc.isDirectBufferPooled()) {  
                        // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.  
                        // We can do a better job by using our pooled allocator. If the current allocator does not  
                        // pool a direct buffer, we rely on JDK's direct buffer pool.  
                        buf = alloc.directBuffer(readableBytes).writeBytes(buf);  
                        in.current(buf);  
                    }  
                }  
  
                boolean setOpWrite = false;  
                boolean done = false;  
                long flushedAmount = 0;  
                if (writeSpinCount == -1) {  
                    writeSpinCount = config().getWriteSpinCount();  
                }  
                for (int i = writeSpinCount - 1; i >= 0; i --) {  
                    int localFlushedAmount = doWriteBytes(buf);  
                    if (localFlushedAmount == 0) {  
                        setOpWrite = true;  
                        break;  
                    }  
  
                    flushedAmount += localFlushedAmount;  
                    if (!buf.isReadable()) {  
                        done = true;  
                        break;  
                    }  
                }  
  
                in.progress(flushedAmount);  
  
                if (done) {  
                    in.remove();  
                } else {  
                    incompleteWrite(setOpWrite);  
                    break;  
                }  
            } else if (msg instanceof FileRegion) {  
                FileRegion region = (FileRegion) msg;  
                boolean setOpWrite = false;  
                boolean done = false;  
                long flushedAmount = 0;  
                if (writeSpinCount == -1) {  
                    writeSpinCount = config().getWriteSpinCount();  
                }  
                for (int i = writeSpinCount - 1; i >= 0; i --) {  
                    long localFlushedAmount = doWriteFileRegion(region);  
                    if (localFlushedAmount == 0) {  
                        setOpWrite = true;  
                        break;  
                    }  
  
                    flushedAmount += localFlushedAmount;  
                    if (region.transfered() >= region.count()) {  
                        done = true;  
                        break;  
                    }  
                }  
  
                in.progress(flushedAmount);  
  
                if (done) {  
                    in.remove();  
                } else {  
                    incompleteWrite(setOpWrite);  
                    break;  
                }  
            } else {  
                throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));  
            }  
        }  
    }  
protected final void incompleteWrite(boolean setOpWrite) {  
        // Did not write completely.  
        if (setOpWrite) {  
            setOpWrite();  
        } else {  
            // Schedule flush again later so other tasks can be picked up in the meantime  
            Runnable flushTask = this.flushTask;  
            if (flushTask == null) {  
                flushTask = this.flushTask = new Runnable() {  
                    @Override  
                    public void run() {  
                        flush();  
                    }  
                };  
            }  
            eventLoop().execute(flushTask);  
        }  
    }  
  
protected final void setOpWrite() {  
        final SelectionKey key = selectionKey();  
        final int interestOps = key.interestOps();  
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {  
            key.interestOps(interestOps | SelectionKey.OP_WRITE);  
        }  
    }  

說明:

  1. 如果當前msg為空,即buffer[flushed]儲存的msg為空,則說明所有msg已經發送完畢,所以需要清除selectionKey中的OP_WRITE位。
  2. 目前Netty僅支援兩種型別(ByteBuf和FileRegion)的寫操作,本文只對ByteBuf型別進行分析。
  3. 首先呼叫buf.readableBytes()判斷buf中是否有可讀的訊息,即writerIndex – readerIndex>0。如果結果為0,則執行in.remove方法;否則,採用類似於自旋鎖的邏輯對buf執行write操作。
//NioSocketChannel  
protected int doWriteBytes(ByteBuf buf) throws Exception {  
        final int expectedWrittenBytes = buf.readableBytes();  
        final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);  
        return writtenBytes;  
}  
//UnpooledHeapByteBuf  
public int readBytes(GatheringByteChannel out, int length) throws IOException {  
        checkReadableBytes(length);  
        int readBytes = getBytes(readerIndex, out, length, true);  
        readerIndex += readBytes;  
        return readBytes;  
}  
  
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {  
        ensureAccessible();  
        ByteBuffer tmpBuf;  
        if (internal) {  
            tmpBuf = internalNioBuffer();  
        } else {  
            tmpBuf = ByteBuffer.wrap(array);  
        }  
        return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));  
    }  

由於是非阻塞IO,所以最終寫到傳送緩衝區中的位元組數writtenBytes可能會小於buf中期望寫出的位元組數expectedWrittenBytes。如果此時不再寫,而是依賴selector的非同步通知,則會導致buf裡剩下的資料不能及時寫出去(因為必須等到selector的下一次迴圈,即必須將本次迴圈中通知的未處理完的所有事件處理完後,以及剩下的task執行完後,然後再執行一次select,才能處理到這個channel的write事件;在這個過程中,還有可能會發生執行緒的上下文切換。這樣,就會導致msg寫到ChannelOutBoundBuffer後,會經歷較大的延遲才能將訊息flush到套接字的傳送緩衝區。

Netty採用類似於自旋鎖的邏輯,在一個迴圈內,多次呼叫write。這樣就有可能將buf中的所有資料在一次flush呼叫中寫完。迴圈的次數值為writeSpinCount,其預設值為16。

但是如果一次write呼叫返回0,則說明發送緩衝區已經完全沒有空間了,如果還繼續呼叫write,而系統呼叫開銷是比較大的,所以是一種浪費,此種情況可以退出迴圈,設定selectionKey的OP_WRITE位,以依賴selector的非同步通知。

如果在自旋期間多次呼叫write後,資料還是沒有寫完,而每次write呼叫的返回又不是0,說明每次的write確實寫出去了一些位元組,這種情況也不能立即退出flush再依賴selector的非同步通知,因為有可能是自旋鎖的迴圈次數設定小了導致buf的資料沒有傳送完,但實際傳送緩衝區還是有空間的。因此將剩下資料的寫作為一個非同步任務放到當前執行緒的任務佇列中,等待排程執行。這樣當本次迴圈中選擇的剩下的所有事件處理完後,就可以執行這個任務了,而不用等到由下次的selector喚醒。

如果msg已全部寫完畢,則執行in.remove()方法進行清理

//ChannelOutBoundBuffer  
public boolean remove() {  
        if (isEmpty()) {  
            return false;  
        }  
        Entry e = buffer[flushed];  
        Object msg = e.msg;  
        if (msg == null) {  
            return false;  
        }  
        ChannelPromise promise = e.promise;  
        int size = e.pendingSize;  
        e.clear();  
        flushed = flushed + 1 & buffer.length - 1;  
        safeRelease(msg);  
        promise.trySuccess();  
        decrementPendingOutboundBytes(size);  
        return true;  
    }  

首先對buffer[flushed]對應的Entry執行clear操作

//Entry  
public void clear() {  
            buffers = null;  
            buf = null;  
            msg = null;  
            promise = null;  
            progress = 0;  
            total = 0;  
            pendingSize = 0;  
            count = -1;  
        }  

然後將flunshed累加1,接著對msg執行基於引用計數的release操作,最後看一下decrementPendingOutboundBytes方法的實現

void decrementPendingOutboundBytes(int size) {  
        // Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets  
        // recycled while process this method.  
        Channel channel = this.channel;  
        if (size == 0 || channel == null) {  
            return;  
        }  
  
        long oldValue = totalPendingSize;  
        long newWriteBufferSize = oldValue - size;  
        while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {  
            oldValue = totalPendingSize;  
            newWriteBufferSize = oldValue - size;  
        }  
  
        int lowWaterMark = channel.config().getWriteBufferLowWaterMark();  
  
        if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {  
            if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {  
                channel.pipeline().fireChannelWritabilityChanged();  
            }  
        }  
    }  

更新ChannelOutBoundBuffer中待發送的msg大小totalPendingSize,並判斷如果totalPendingSize小於channel的低水位線,則設定channel為可寫,並觸發ChannelWritabilityChanged事件。

b)最後,分析gathering write邏輯:

//NioSocketChannel  
protected void doWrite(ChannelOutboundBuffer in) throws Exception {  
        for (;;) {  
            //之前已介紹非gathering write的邏輯,所以此處省略相關程式碼  
            int nioBufferCnt = in.nioBufferCount();  
            long expectedWrittenBytes = in.nioBufferSize();  
            final SocketChannel ch = javaChannel();  
            long writtenBytes = 0;  
            boolean done = false;  
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {  
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);  
                if (localWrittenBytes == 0) {  
                    break;  
                }  
                expectedWrittenBytes -= localWrittenBytes;  
                writtenBytes += localWrittenBytes;  
                if (expectedWrittenBytes == 0) {  
                    done = true;  
                    break;  
                }  
            }  
            if (done) {  
                // Release all buffers  
                for (int i = msgCount; i > 0; i --) {  
                    in.remove();  
                }  
                // Finish the write loop if no new messages were flushed by in.remove().  
                if (in.isEmpty()) {  
                    clearOpWrite();  
                    break;  
                }  
            } else {  
                // Did not write all buffers completely.  
                // Release the fully written buffers and update the indexes of the partially written buffer.  
                for (int i = msgCount; i > 0; i --) {  
                    final ByteBuf buf = (ByteBuf) in.current();  
                    final int readerIndex = buf.readerIndex();  
                    final int readableBytes = buf.writerIndex() - readerIndex;  
                    if (readableBytes < writtenBytes) {  
                        in.progress(readableBytes);  
                        in.remove();  
                        writtenBytes -= readableBytes;  
                    } else if (readableBytes > writtenBytes) {  
                        buf.readerIndex(readerIndex + (int) writtenBytes);  
                        in.progress(writtenBytes);  
                        break;  
                    } else { // readableBytes == writtenBytes  
                        in.progress(readableBytes);  
                        in.remove();  
                        break;  
                    }  
                }  
  
                setOpWrite();  
                break;  
            }  
        }  
    }  

說明:

同樣採用類似於自旋鎖的方式執行gathering write
如果所有的msg都已flush到傳送緩衝區中,則對這些msg執行release;如果ChannelOutBoundBuffer的isEmpty方法返回true(即執行flush期間,沒有併發執行write操作導致ChannelOutBoundBuffer中新增待發送的msg,保持了unflushed不變。那麼此種情況下,unflushed=flushed),則清除selectionKey的OP_WRITE位;
如果還有msg未flush到傳送緩衝區中,則按照flushed->unflushed的順序對每一個ByteBuf進行處理,如果ByteBuf全部flush完,則進行release,否則僅僅更新該ByteBuf的readerIndex
incompleteWrite(setOpWrite)的作用之前已介紹過:如果最後一次write呼叫返回0,則說明發送緩衝區已經完全沒有空間了,此種情況可以退出迴圈,設定selectionKey的OP_WRITE位,以依賴selector的非同步通知。如果多次呼叫write後,資料還是沒有寫完,則將剩下資料的寫作為一個非同步任務放到當前執行緒的任務佇列中,等待排程執行。