1. 程式人生 > >ZooKeeper-客戶端連線ServerCnxn之NIOServerCnxn

ZooKeeper-客戶端連線ServerCnxn之NIOServerCnxn

背景

ServerCnxn代表了一個客戶端與一個server的連線,其有兩種實現,分別是NIOServerCnxnNettyServerCnxn,類圖如下:
這裡寫圖片描述
本文介紹ZooKeeper是如何通過NIOServerCnxn實現網路IO的.

處理read事件

發生時機

SocketChannel上有資料可讀時,worker thread呼叫NIOServerCnxn.doIO()進行讀操作

粘包拆包問題

處理讀事件比較麻煩的問題就是通過TCP傳送的報文會出現粘包拆包問題,Zookeeper為了解決此問題,在設計通訊協議時將報文分為3個部分:

  1. 請求頭和請求體的長度(4個位元組)
  2. 請求頭
  3. 請求體

注:(1)請求頭和請求體也細分為更小的部分,但在此不做深入研究,只需知道請求的前4個位元組是請求頭和請求體的長度即可.(2)將請求頭和請求體稱之為payload
在報文頭增加了4個位元組的長度欄位,表示整個報文除長度欄位之外的長度.服務端可根據該長度將粘包拆包的報文分離或組合為完整的報文.NIOServerCnxn讀取資料流程如下:

  1. NIOServerCnxn中有兩個屬性,一個是lenBuffer,容量為4個位元組,用於讀取長度資訊.一個是incomingBuffer,其初始化時即為lenBuffer,但是讀取長度資訊後,就為incomingBuffer分配對應的空間用於讀取payload
  2. 根據請求報文的長度分配incomingBuffer的大小
  3. 將讀到的位元組存放在incomingBuffer中,直至讀滿(由於第2步中為incomingBuffer分配的長度剛好是報文的長度,此時incomingBuffer中剛好時一個報文)
  4. 處理報文

程式碼如下:

 void doIO(SelectionKey k) throws InterruptedException {
        try {
            ...
           /*
            處理讀操作的流程
            1.最開始incomingBuffer就是lenBuffer,容量為4.第一次讀取4個位元組,即此次請求報文的長度
            2.根據請求報文的長度分配incomingBuffer的大小
            3.將讀到的位元組存放在incomingBuffer中,直至讀滿
             (由於第2步中為incomingBuffer分配的長度剛好是報文的長度,此時incomingBuffer中剛好時一個報文)
            4.處理報文
            */
if (k.isReadable()) { //若是客戶端請求,此時觸發讀事件 //初始化時incomingBuffer即時lengthBuffer,只分配了4個位元組,供使用者讀取一個int(此int值就是此次請求報文的總長度) int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } /* 只有incomingBuffer.remaining() == 0,才會進行下一步的處理,否則一直讀取資料直到incomingBuffer讀滿,此時有兩種可能: 1.incomingBuffer就是lenBuffer,此時incomingBuffer的內容是此次請求報文的長度. 根據lenBuffer為incomingBuffer分配空間後呼叫readPayload(). 在readPayload()中會立馬進行一次資料讀取,(1)若可以將incomingBuffer讀滿,則incomingBuffer中就是一個完整的請求,處理該請求; (2)若不能將incomingBuffer讀滿,說明出現了拆包問題,此時不能構造一個完整的請求,只能等待客戶端繼續傳送資料,等到下次socketChannel可讀時,繼續將資料讀取到incomingBuffer中 2.incomingBuffer不是lenBuffer,說明上次讀取時出現了拆包問題,incomingBuffer中只有一個請求的部分資料. 而這次讀取的資料加上上次讀取的資料湊成了一個完整的請求,呼叫readPayload() */ if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request //解析上文中讀取的報文總長度,同時為"incomingBuffer"分配len的空間供讀取全部報文 incomingBuffer.flip(); //為incomeingBuffer分配空間時還包括了判斷是否是"4字命令"的邏輯 isPayload = readLength(k); incomingBuffer.clear(); } else { //2.incomingBuffer不是lenBuffer,此時incomingBuffer的內容是payload // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword //處理報文 readPayload(); } else { // four letter words take care // need not do anything else return; } } } ... } catch (CancelledKeyException e) { ... } }
    /**
     * 有兩種情況會呼叫此方法:
     * 1.根據lengthBuffer的值為incomingBuffer分配空間後,此時尚未將資料從socketChannel讀取至incomingBuffer中
     * 2.已經將資料從socketChannel中讀取至incomingBuffer,且讀取完畢
     * <p>
     * Read the request payload (everything following the length prefix)
     */
    private void readPayload() throws IOException, InterruptedException {
        // have we read length bytes?
        if (incomingBuffer.remaining() != 0) {
            // sock is non-blocking, so ok
            //對應情況1,此時剛為incomingBuffer分配空間,incomingBuffer為空,進行一次資料讀取
            //(1)若將incomingBuffer讀滿,則直接進行處理;
            //(2)若未將incomingBuffer讀滿,則說明此次傳送的資料不能構成一個完整的請求,則等待下一次資料到達後呼叫doIo()時再次將資料
            //從socketChannel讀取至incomingBuffer
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely client has closed socket");
            }
        }
        // have we read length bytes?
        if (incomingBuffer.remaining() == 0) {
            //不管是情況1還是情況2,此時incomingBuffer已讀滿,其中內容必是一個request,處理該request
            //更新統計值
            packetReceived();
            incomingBuffer.flip();
            if (!initialized) {
                //處理連線請求
                readConnectRequest();
            } else {
                //處理普通請求
                readRequest();
            }
            //請求處理結束,重置lenBuffer和incomingBuffer
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

解決粘包拆包的思路如上所述,程式碼中增加了很多註釋.

思考

個人認為,上述資料讀取過程一次至多讀取一個請求,即使在此次可讀取的資料中包含多個請求也是如此.而TCP報文的MSS一般為1460,客戶端的請求為50~100位元組,在客戶端請求非常頻繁時,一個TCP報文完全可以包含多個請求.
為了解決該問題,可以增加一個屬性outgoingIncomingBuffer,其資料型別為List<ByteBuffer>用於存放此次讀取的完整的請求,這樣就可將此次可讀取的資料全部讀取完畢,無需等到下一次selector.select(),減輕了selector.select()的負擔.

處理write事件

發生時機

SocketChannel可寫時,worker thread呼叫NIOServerCnxn.doIO()進行寫操作

DirectByteBuffer

由於Zookeeper中使用了DirectByteBuffer進行IO操作,在此簡單介紹下DirectByteBufferHeapByteBuffer的區別.
HeapByteBuffer是在堆上分配的記憶體,而DirectByteBuffer是在堆外分配的記憶體,又稱直接記憶體.使用HeapByteBuffer進行IO時,比如呼叫FileChannel.write(HeapByteBuffer)將資料寫到File中時,有兩個步驟:

  1. HeapByteBuffer的資料拷貝到DirectByteBuffer
  2. 再從堆外記憶體將資料寫入到檔案中.

問題1:為什麼要將HeapByteBuffer的資料拷貝到DirectByteBuffer呢?不能將資料直接從HeapByteBuffer拷貝到檔案中嗎?

並不是說作業系統無法直接訪問jvm中分配的記憶體區域,顯然作業系統是可以訪問所有的本機記憶體區域的,但是為什麼對io的操作都需要將jvm記憶體區的資料拷貝到堆外記憶體呢?是因為jvm需要進行GC,如果io裝置直接和jvm堆上的資料進行互動,這個時候jvm進行了GC,那麼有可能會導致沒有被回收的資料進行了壓縮,位置被移動到了連續的儲存區域,這樣會導致正在進行的io操作相關的資料全部亂套,顯然是不合理的,所以對io的操作會將jvm的資料拷貝至堆外記憶體,然後再進行處理,將不會被jvm上GC的操作影響。

問題2:DirectByteBuffer是相當於固定的核心buffer還是JVM程序內的堆外記憶體?
不管是Java堆還是直接記憶體,都是JVM程序通過malloc申請的記憶體,其都是使用者空間的記憶體,只不過是JVM程序將這兩塊使用者空間的記憶體用作不同的用處罷了.Java記憶體模型如下:
這裡寫圖片描述

問題3:將HeapByteBuffer的資料拷貝到DirectByteBuffer這一過程是作業系統執行還是JVM執行?
在問題2中已經回答,DirectByteBuffer是JVM程序申請的使用者空間記憶體,其使用和分配都是由JVM程序管理,因此這一過程是JVM執行的.也正是因為JVM知道堆記憶體會經常GC,資料地址經常移動,而底層通過write,read,pwrite,pread等函式進行系統呼叫時,需要傳入buffer的起始地址和buffer count作為引數,因此JVM在執行讀寫時會做判斷,若是HeapByteBuffer,就將其拷貝到直接記憶體後再呼叫系統呼叫執行步驟2.
程式碼在sun.nio.ch.IOUtil.write()sun.nio.ch.IOUtil.read()中,我們看下write()的程式碼:
知乎不能複製,程式碼地址如下:Java NIO direct buffer的優勢在哪兒?,第一個答案中有程式碼.

問題4:在將資料寫到檔案的過程中需要將資料拷貝到核心空間嗎?
需要.在步驟3中,是不能直接將資料從直接記憶體拷貝到檔案中的,需要將資料從直接記憶體->核心空間->檔案,因此使用DirectByteBuffer代替HeapByteBuffer也只是減少了資料拷貝的一個步驟,但對效能已經有提升了.

問題5:還有其他減少資料拷貝的方法嗎?
有,我目前知道的有兩種,分別是sendFile系統呼叫記憶體對映.
比如想要將資料從磁碟檔案傳送到socket,使用read/write系統呼叫需要將資料從磁碟檔案->read buffer(核心空間中)->使用者空間->socket buffer(也在核心空間中)->NIC buffer(網絡卡),而使用sendFile(即FileChannel.transferTo())系統呼叫就可減少複製到使用者空間的過程,變為資料從磁碟檔案->read buffer(核心空間中)->socket buffer(也在核心空間中)->NIC buffer(網絡卡),當然,還會有其他的優化手段,詳見什麼是Zero-Copy?
記憶體對映我也不是很清楚,詳見JAVA NIO之淺談記憶體對映檔案原理與DirectMemory

問題6:netty中使用了哪幾種方式實現高效IO?
netty中使用了3種方式實現其zero-copy機制,如下:

  1. 使用DirectByteBuffer
  2. 使用FileChannel.transferTo()
  3. ntty提供了組合Buffer物件,可以聚合多個ByteBuffer物件,使用者可以像操作一個Buffer那樣方便的對組合Buffer進行操作,避免了傳統通過記憶體拷貝的方式將幾個小Buffer合併成一個大的Buffer

NIOServerCnxnFactory中的直接記憶體

    /**
     * 使用其執行高效的socket I/O,由於I/O由worker thread執行,因此將直接記憶體設定為ThreadLocal的.
     * 各連線可以在共享直接記憶體的同時無需擔心併發問題.
     * <p>
     * We use this buffer to do efficient socket I/O. Because I/O is handled
     * by the worker threads (or the selector threads directly, if no worker
     * thread pool is created), we can create a fixed set of these to be
     * shared by connections.
     */
    private static final ThreadLocal<ByteBuffer> directBuffer =
            new ThreadLocal<ByteBuffer>() {
                @Override
                protected ByteBuffer initialValue() {
                    return ByteBuffer.allocateDirect(directBufferBytes);
                }
            };

NIOServerCnxnFactory中,設定了ThreadLocal型別的DirectByteBuffer,其容量由系統屬性zookeeper.nio.directBufferBytes控制,預設為64K.

原始碼

 /**
     * 當{@link #sock}可寫時呼叫該方法
     *
     * @param k {@link #sock}關聯的SelectionKey
     */
    void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
        if (outgoingBuffers.isEmpty()) {
            return;
        }

        /*
         * 嘗試獲取直接記憶體
         */
        ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
        if (directBuffer == null) {
            //不使用直接記憶體
            ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
            sock.write(outgoingBuffers.toArray(bufferList));

            // Remove the buffers that we have sent
            ByteBuffer bb;
            while ((bb = outgoingBuffers.peek()) != null) {
                if (bb == ServerCnxnFactory.closeConn) {
                    throw new CloseRequestException("close requested");
                }
                if (bb.remaining() > 0) {
                    break;
                }
                packetSent();
                outgoingBuffers.remove();
            }
        } else {
            //使用直接記憶體
            directBuffer.clear();

            for (ByteBuffer b : outgoingBuffers) {
                if (directBuffer.remaining() < b.remaining()) {
                    /*
                     * 若directBuffer的剩餘可寫空間不足以容納b的所有資料,則修改b的limit為directBuffer的剩餘可寫空間.
                     * 這樣下面的複製程式碼剛好將directBuffer的可寫空間寫滿
                     */
                    b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
                }
                /*
                 * put()會修改b和directBuffer的position值,但是我們不能修改b的position值,
                 * 因為下文需要position的值將已傳送的資料移出outgoingBuffers,因此在複製結束後重置position值.
                 *
                 */
                int p = b.position();
                //將b中的資料複製到directBuffer中
                directBuffer.put(b);
                b.position(p);
                if (directBuffer.remaining() == 0) {
                    break;
                }
            }
            /*
             * Do the flip: limit becomes position, position gets set to
             * 0. This sets us up for the write.
             */
            directBuffer.flip();

            //返回傳送的位元組數,下文據此移除已傳送的資料
            int sent = sock.write(directBuffer);

            ByteBuffer bb;

            // 將已傳送的buffers從outgoingBuffers中移除
            while ((bb = outgoingBuffers.peek()) != null) {
                if (bb == ServerCnxnFactory.closeConn) {
                    throw new CloseRequestException("close requested");
                }
                if (sent < bb.remaining()) {
                    /*
                     * 只發送了此Buffer的部分資料,因此修改position的值並退出迴圈
                     */
                    bb.position(bb.position() + sent);
                    break;
                }
                packetSent();
                //該buffer的資料已經全部發送,將buffer從outgoingBuffers中移除
                sent -= bb.remaining();
                outgoingBuffers.remove();
            }
        }
    }

從程式碼中可以看出,若分配了直接記憶體,則優先使用直接記憶體傳送資料.此外,從outgoingBuffers中獲取待發送的資料,outgoingBuffers作用是將構造響應傳送響應解耦(即處理請求獲取響應和將響應傳送給客戶端兩個操作非同步執行).響應構造成功後就新增至outgoingBuffers中,當可以傳送資料時,就從outgoingBuffers中獲取資料傳送.
通過sendBuffer()將待發送的資料新增至outgoingBuffers中,很多方法都會呼叫sendBuffer(),如NIOServerCnxn.sendResponse(),NIOServerCnxn.sendCloseSession(),ZookeeperServer.finishSessionInit()等,其中FinalRequestProcessor處理完請求後呼叫NIOServerCnxn.sendResponse().

    /**
     * sendBuffer pushes a byte buffer onto the outgoing buffer queue for
     * asynchronous writes.
     */
    @Override
    public void sendBuffer(ByteBuffer bb) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                    + " is valid: " + sk.isValid());
        }
        outgoingBuffers.add(bb);
        requestInterestOpsUpdate();
    }

總結

  1. 粘包拆包問題的解決
  2. 直接記憶體的使用
  3. 非同步的思想:在請求處理鏈執行緒中構造響應,在worker thread中傳送響應,執行緒間通過outgoingBuffers通訊,將構造響應傳送響應非同步化

參考