1. 程式人生 > >Tomcat原始碼分析之:ServletOutputStream的實現

Tomcat原始碼分析之:ServletOutputStream的實現

貌似很久都沒有寫部落格了,tomcat8的程式碼已經看了很多,主體部分的程式碼也都看得差不多了,發現在tomcat8中已經完全支援非阻塞的方式接收以及傳送資料了。。。。但是比較遺憾的是,以前遺留下來的太多的老程式碼都不支援這種新的方式來發送資料。。。木有辦法。。。

這裡來看看Tomcat中是如何實現ServletOutputStream的吧。。。。

在具體的來看它之前,這裡先來一張圖來描述一下Tomcat的資料傳送時候的流動。。。


這張圖形已經比較好的展現成了Tomcat中對IO這一塊封裝的層次關係吧,首先是最上層的

ServletOutputStream物件,它是使用者程式碼可以接觸到的物件,它自帶了自己的OutputBuffer,這裡寫資料其實是通過這個這個buffer完成的,資料是寫到這個OutputBuffer裡面

接下來的一層就是Tomcat內部的Response型別了,每一個ServletResponse物件都對應著一個唯一的Tomcat內部的response型別,當然了,他也有自己的buffer,InternalNioOutputBuffer型別的物件,這裡上層ServletOutputStream寫資料將會流動到這裡。。。

最下層就是與channel關聯的部分了,它也有自己的buffer,這裡一般就是java類庫中的niobuffer,上層的資料將會流動到這裡。。。

最後才是將資料通過channel傳送出去。。。

嗯,雖然好像層次稍微多了些,而且剛剛開始看程式碼就覺得稍微有點繁瑣。。不過當將這個層次關係理清楚了之後還是蠻簡單的。。。

好啦,接下來來看程式碼了。。。

這裡就通過一次write來跟蹤整個程式碼的執行軌跡吧,首先來看看CoyoteOutputStream型別的write方法:

    //傳送一個byte陣列的一部分資料
    public void write(byte[] b, int off, int len) throws IOException {
        boolean nonBlocking = checkNonBlockingWrite();  //判斷是否支援非阻塞的傳送資料,這裡判斷的標準其實是使用者程式碼是否加入了WriteListener
        ob.write(b, off, len);  //將資料寫到outputbuffer裡面
        if (nonBlocking) {
            checkRegisterForWrite();  //如果是非阻塞的傳送資料的話,需要確保channel有註冊寫事件
        }
    }

這裡首先要判斷是否支援非阻塞的傳送資料,這裡就不細講這部分的類容了,,,來看看這裡的outputbuffer的write行為吧:

//寫入一個位元組陣列的一部分
    public void write(byte b[], int off, int len) throws IOException {

        if (suspended) {
            return;
        }

        writeBytes(b, off, len);

    }


    private void writeBytes(byte b[], int off, int len)
        throws IOException {

        if (closed) {  //如果已經關閉了,那麼直接返回吧
            return;
        }
        //這裡其實是寫到buffer裡,如果資料過大,也有可能呼叫realWriteBytes方法真正的呼叫底層寫資料
        bb.append(b, off, len);
        bytesWritten += len;

        // if called from within flush(), then immediately flush
        // remaining bytes
        if (doFlush) {
            bb.flushBuffer();
        }

    }

這裡其實就是將資料直接放到當年的bytechunk就好了。。。但是這裡如果資料比較多的話,會將資料直接寫到下一層,也就是tomcat內建的response。。。來看看bytechunk的append方法吧:

    //加入一個位元組陣列的一部分資料
    public void append( byte src[], int off, int len )
        throws IOException
    {
        // will grow, up to limit
        makeSpace( len );  //確保有這麼多空間可以寫

        // if we don't have limit: makeSpace can grow as it wants
        if( limit < 0 ) {  //表示沒有空間限制
            // assert: makeSpace made enough space
            System.arraycopy( src, off, buff, end, len );  //將資料複製過來
            end+=len;  //將end偏移加上len
            return;
        }

        // Optimize on a common case.
        // If the buffer is empty and the source is going to fill up all the
        // space in buffer, may as well write it directly to the output,
        // and avoid an extra copy
        //如果一次就填滿了,那麼還是寫出去好了
        if ( len == limit && end == start && out != null ) {
            out.realWriteBytes( src, off, len );  //因為要寫的資料比較大,所以直接寫到更下層去
            return;
        }
        // if we have limit and we're below
        if( len <= limit - end ) {  //表示還有足夠的空間可以寫資料
            // makeSpace will grow the buffer to the limit,
            // so we have space
            System.arraycopy( src, off, buff, end, len );
            end+=len;
            return;
        }

        // need more space than we can afford, need to flush
        // buffer

        // the buffer is already at ( or bigger than ) limit

        // We chunk the data into slices fitting in the buffer limit, although
        // if the data is written directly if it doesn't fit

        //程式碼執行到這裡,說明空間不夠了
        int avail=limit-end;  //還剩下多大的空間可以寫資料
        System.arraycopy(src, off, buff, end, avail);
        end += avail;

        flushBuffer(); 

        int remain = len - avail;

        while (remain > (limit - end)) {  //不斷的嘗試寫資料到下面去
            out.realWriteBytes( src, (off + len) - remain, limit - end );
            remain = remain - (limit - end);
        }

        System.arraycopy(src, (off + len) - remain, buff, end, remain);
        end += remain;

    }

這裡可以看到realWriteBytes方法,它其實就是外面的outputbuffer定義的方法,來看看吧:

    //這個才是真正的呼叫底層傳送資料,其實又是呼叫tomcat的response來寫資料
    public void realWriteBytes(byte buf[], int off, int cnt)
            throws IOException {

        if (closed) {
            return;
        }
        if (coyoteResponse == null) {
            return;
        }

        // If we really have something to write
        if (cnt > 0) {
            // real write to the adapter
            outputChunk.setBytes(buf, off, cnt);  //設定outputchunk
            try {
                coyoteResponse.doWrite(outputChunk);  //通過tomcat的response來寫資料,其實是寫到httpprocessor的buffer裡面去了
            } catch (IOException e) {
                // An IOException on a write is almost always due to
                // the remote client aborting the request.  Wrap this
                // so that it can be handled better by the error dispatcher.
                throw new ClientAbortException(e);
            }
        }

    }

嗯,這裡就與上面的層次對應上了吧,其實就是寫到tomcat內建的response。。好了,接下來繼續。。

    //在servlet的outputStream可能會呼叫這個方法來寫資料
    public void doWrite(ByteChunk chunk/*byte buffer[], int pos, int count*/)
        throws IOException
    {
        outputBuffer.doWrite(chunk, this);  //呼叫在httpprocessor裡面建立的outputbuffer來寫資料,這裡會將資料寫到niochannel的buffer,然後最終傳送出去
        contentWritten+=chunk.getLength();   //標記已經發送的資料量的大小
    }

嗯,這裡其實是寫到internalNiobuffer裡去。。。。繼續看吧:

 public int doWrite(ByteChunk chunk, Response res) throws IOException {

            int len = chunk.getLength();
            int start = chunk.getStart();
            byte[] b = chunk.getBuffer();
            addToBB(b, start, len);
            byteCount += chunk.getLength();  
            return chunk.getLength();
        }

嗯,沒啥意思,繼續:

    private synchronized void addToBB(byte[] buf, int offset, int length)
            throws IOException {

        if (length == 0) return;

        // Try to flush any data in the socket's write buffer first
        //首先嚐試先將資料傳送出去
        boolean dataLeft = flushBuffer(isBlocking());

        // Keep writing until all the data is written or a non-blocking write
        // leaves data in the buffer
        //這裡只有在緩衝區裡面已經沒有資料了才繼續傳送
        while (!dataLeft && length > 0) {
        	//首先將要傳送的資料copy到niochanel的傳送buffer裡面去
            int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
            length = length - thisTime;  //計算還剩下多少位元組沒有寫到niochannel的buffer裡面,其實這裡也就當做將資料轉移到了niochannel的buffer就算是寫出去了
            offset = offset + thisTime;  //這裡用於調整偏移量
            //這裡呼叫writeToSocket方法將niochannel的buffer的裡面的資料通過socket寫出去
            int written = writeToSocket(socket.getBufHandler().getWriteBuffer(),
                    isBlocking(), true);  //如果在tomcat的response裡面有writelistener的話,可以非同步的寫
            if (written == 0) {  //都沒有寫出去位元組
                dataLeft = true;
            } else {
                dataLeft = flushBuffer(isBlocking());  //flush一下,看一下是否還會有資料剩餘
            }
        }

        NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
        if (ka != null) ka.access();//prevent timeouts for just doing client writes

        if (!isBlocking() && length > 0) { //在非阻塞的傳送中,如果實在傳送不出去,需要儲存在額外的buffer裡面
            // Remaining data must be buffered
            addToBuffers(buf, offset, length);
        }
    }

這裡其實主要是呼叫flushBuffer方法,將資料傳給下層的niochannel,而且可以看到對於過多的資料這裡還會 做一層快取。。。。

    //這裡其實是flush  niochannel的buffer
    protected boolean flushBuffer(boolean block) throws IOException {

        //prevent timeout for async,
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        if (key != null) {
            NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
            attach.access();
        }

        boolean dataLeft = hasMoreDataToFlush();   //其實這裡是判斷niochannel的buffer裡面是否還有資料要寫

        //write to the socket, if there is anything to write
        if (dataLeft) {  //如果niochannel的buffer裡面還有資料傳送,那麼繼續寫
            writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
        }

        dataLeft = hasMoreDataToFlush();

        //這裡如果niochannel的buffer裡面的資料已經發送完了,那麼將將以前緩衝的資料再發送出去
        if (!dataLeft && bufferedWrites.size() > 0) {
            Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();  //遍歷待發送的資料
            while (!hasMoreDataToFlush() && bufIter.hasNext()) {
                ByteBufferHolder buffer = bufIter.next();
                buffer.flip();
                while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
                    transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
                    if (buffer.getBuf().remaining() == 0) {  //如果當前buffer裡面的所有資料都已經轉移到了niochannel的buffer裡面,那麼可以將這個buffer移除了
                        bufIter.remove();
                    }
                    writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
                    //here we must break if we didn't finish the write
                }
            }
        }

        return hasMoreDataToFlush();
    }

嗯,這裡其實主要是呼叫writeToSocket方法。。。來看看吧:

    //這裡其實呼叫socket來寫資料
    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
        if ( flip ) {
            bytebuffer.flip();
            flipped = true;
        }

        int written = 0;
        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
        if ( att == null ) throw new IOException("Key must be cancelled");
        long writeTimeout = att.getWriteTimeout();
        Selector selector = null;
        try {
            selector = pool.get();
        } catch ( IOException x ) {
            //ignore
        }
        try {
            written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
            //make sure we are flushed
            do {
            	//對於niochanel,這個flush方法其實是沒用的
                if (socket.flush(true,selector,writeTimeout)) break;
            }while ( true );
        }finally {
            if ( selector != null ) {
            	pool.put(selector);
            }
        }
        if ( block || bytebuffer.remaining()==0) {
            //blocking writes must empty the buffer
            //and if remaining==0 then we did empty it
            bytebuffer.clear();
            flipped = false;
        }
        // If there is data left in the buffer the socket will be registered for
        // write further up the stack. This is to ensure the socket is only
        // registered for write once as both container and user code can trigger
        // write registration.
        return written;
    }

這裡因為涉及到了一些阻塞的或者非阻塞的傳送資料。。所以可能會用到selector。。。

    public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                     long writeTimeout, boolean block) throws IOException {
        if ( SHARED && block ) { //對於寫資料,一般都是這裡
            return blockingSelector.write(buf,socket,writeTimeout);
        }
        //但是如果有outputstream的listener的話,可以採用非阻塞的方式來發送大量的資料
        SelectionKey key = null;
        int written = 0;
        boolean timedout = false;
        int keycount = 1; //assume we can write  //假裝剛開始是可以寫的
        long time = System.currentTimeMillis(); //start the timeout timer
        try {
            while ( (!timedout) && buf.hasRemaining() ) {
                int cnt = 0;
                if ( keycount > 0 ) { //only write if we were registered for a write
                    cnt = socket.write(buf); //write the data
                    if (cnt == -1) throw new EOFException();  //出錯了

                    written += cnt;
                    if (cnt > 0) {
                        time = System.currentTimeMillis(); //reset our timeout timer
                        continue; //we successfully wrote, try again without a selector
                    }
                    if (cnt==0 && (!block)) {   //這裡對於非阻塞的寫,就直接返回了
                    	break; //don't block
                    }
                }
                if ( selector != null ) {
                    //register OP_WRITE to the selector
                    if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
                    else key.interestOps(SelectionKey.OP_WRITE);
                    if (writeTimeout==0) {
                        timedout = buf.hasRemaining();
                    } else if (writeTimeout<0) {
                        keycount = selector.select();
                    } else {
                        keycount = selector.select(writeTimeout);
                    }
                }
                if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
            }//while
            if ( timedout ) throw new SocketTimeoutException();
        } finally {
            if (key != null) {
                key.cancel();
                if (selector != null) selector.selectNow();//removes the key from this selector
            }
        }
        return written;
    }

這裡如果就直接呼叫niochannel來發送資料了。。不過其實這裡還會涉及到將資料轉移到niochannel的buffer。。然後才傳送資料。。。

到這裡整個資料的流動層次對照著上面的圖形應該就算是 比較明白了吧。。。。