1. 程式人生 > >Java NIO學習筆記:結合原始碼分析+Reactor模式

Java NIO學習筆記:結合原始碼分析+Reactor模式

Java NIO和IO的主要區別

下表總結了Java NIO和IO之間的主要差別,我會更詳細地描述表中每部分的差異。

IO                           NIO 面向流                     面向緩衝 阻塞IO                    非阻塞IO 無                           選擇器

面向流與面向緩衝

Java NIO和IO之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味著每次從流中讀一個或多個位元組,直至讀取所有位元組,它們沒有被快取在任何地方。此外,它不能前後移動流中的資料。如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。 Java NIO的緩衝導向方法略有不同。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的資料。

阻塞與非阻塞IO

Java IO的各種流(Stream)是阻塞的。這意味著,當一個執行緒呼叫read() 或 write()時,該執行緒被阻塞,直到有一些資料被讀取,或資料完全寫入。該執行緒在此期間不能再幹任何事情了。 Java NIO的非阻塞模式,使一個執行緒從某通道傳送請求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取。而不是保持執行緒阻塞,所以直至資料變的可以讀取之前,該執行緒可以繼續做其他的事情。 非阻塞寫也是如此。一個執行緒請求寫入一些資料到某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。 執行緒通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)。

先上程式碼看看,傳統IO vs NIO

傳統IO讀取檔案 程式碼一覽

案例採用FileInputStream讀取檔案內容的: 

// 傳統IO模式讀取檔案內容
public void IoDemo() throws IOException {
    InputStream in =
            new BufferedInputStream(new FileInputStream("xx.txt"));
    byte[] buf = new byte[1024];
    int bytesRead = in.read(buf);

    while (bytesRead != -1) {
        for (int i = 0;i < bytesRead; i++) {
            System.out.println((char) buf[i]);
            bytesRead = in.read(buf);
        }
    }
    in.close();
}

輸出結果:(略)

然後看看NIO的程式碼怎麼寫

案例是對應的NIO(這裡通過RandomAccessFile進行操作,當然也可以通過FileInputStream.getChannel()進行操作):

// NIO模式讀取檔案內容
public void NioTest() throws IOException {
    RandomAccessFile aFile =
            new RandomAccessFile("xx.txt", "tw");
    FileChannel fileChannel = aFile.getChannel();
    ByteBuffer buf = ByteBuffer.allocate(1024);

    int bytesRead = fileChannel.read(buf);
    System.out.println(bytesRead);
    while (bytesRead != -1) {
        buf.flip();
        while (buf.hasRemaining()) {
            System.out.println((char) buf.get());
        }
        buf.compact();
        bytesRead = fileChannel.read(buf);
    }
    aFile.close();
}

輸出結果:(略)  通過程式碼的對比,應該能看出個大概,最起碼能發現NIO的實現方式比叫複雜。有了一個大概的印象可以進入下一步了。

NIO 核心元件

  1. Channel
  2. Buffer
  3. Selector

Channel

Java NIO Channel通道和流非常相似,主要有以下幾點區別:

  • Channel是雙向的,也就是說可讀也可寫。Stream是單向的(只能讀或者寫)。
  • Channel可以非同步讀寫。
  • Channel總是基於緩衝區Buffer來讀寫。

http://tutorials.jenkov.com/images/java-nio/overview-channels-buffers.png

Java NIO: Channels read data into Buffers, and Buffers write data into Channels

翻譯:Channels讀資料到Buffers,Buffers寫資料到Channels

這裡有個問題,就是往buffer裡寫資料為什麼是channel.read(buffer);呢,按理說“寫”對應的不應該是“write”嗎。我剛開始這裡也容易搞混。後來看了原始碼的解釋和重新思考了一下面向物件的思想,這個問題也就想明白了

  1. 首先面向物件的主體是物件,所有的操作都是圍繞著這個物件展開的。這裡channel就是物件,而read()這個方法是針對channel的。
  2. 再來看原始碼的解釋

注意這句:

Reads a sequence of bytes from this channel into the given buffer.

翻譯:從channel讀一些bytes到buffer裡。

小結:這個read()是針對channel而言的,意思就是從先從channel讀取一些bytes,然後再寫到buffer裡。

常用Channel

Channel就像他的名字一樣,只是個“管道”不儲存資料,是用來傳輸資料的,而且這種傳輸是雙向的,可以傳入也可以傳出。

下面列出Java NIO中最重要的集中Channel的實現:

  • FileChannel:檔案的資料讀寫。
  • DatagramChannel:UDP的資料讀寫。
  • SocketChannel:TCP的資料讀寫。
  • ServerSocketChannel:允許我們監聽TCP連結請求,每個請求會建立會一個SocketChannel.

Channel的基礎示例(Basic Channel Example)

這有一個利用FileChannel讀取資料到Buffer的例子:

注意:因為Channel要結合Buffer一起才能使用,所以這裡的Channel示例和下面的Buffer程式碼基本相同。更詳細的後面會講

RandomAccessFile aFile = new RandomAccessFile("xx.txt", "rw");
FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
  System.out.println("Read " + bytesRead);
  buf.flip();
  while(buf.hasRemaining()){
        System.out.print((char) buf.get());
  }
  buf.clear();
  bytesRead = inChannel.read(buf);
}
aFile.close();

注意buf.flip()的呼叫,

如果想要從Buffer讀取資料,要呼叫flip()方法,把Buffer從寫模式(write mode)切換到讀模式(read mode)。

後面會詳細講file()

Buffer

Buffer顧名思義:緩衝區,實際上是一個容器,一個連續陣列。

Channel提供從檔案、網路讀取資料的渠道,但是讀寫的資料都必須經過Buffer。如下圖: 

向Buffer中寫資料有兩種方法:

  1. 從Channel寫到Buffer (fileChannel.read(buf))
  2. 通過Buffer的put()方法 (buf.put(…))

從Buffer中讀取資料同樣也有兩種方法:

  1. 從Buffer讀取到Channel (channel.write(buf))
  2. 使用get()方法從Buffer中讀取資料 (buf.get())

Buffer屬性:

  1. capacity
  2. position
  3. limit
  4. mark

這裡直接看原始碼

原始碼中這一局交代了他們之間的大小關係

// Invariants: mark <= position <= limit <= capacity

Buffer屬性解釋:

索引

說明

capacity

緩衝區陣列的總長度。固定不變

position

是下一個要讀取或寫入的元素的索引(注意!不是當前索引!)。0 <= position <= limit 

limit

是讀\寫模式下可讀\寫的最大範圍。0 <= limit <= capacity

mark

用於記錄當前position的位置,用於恢復position的位置。預設為-1。後面會講

Java NIO Buffers用於和NIO Channel互動。正如你已經知道的,我們從channel中讀取資料到buffers裡,從buffer把資料寫入到channels.

buffer本質上就是一塊記憶體區,可以用來寫入資料,並在稍後讀取出來。這塊記憶體被NIO Buffer包裹起來,對外提供一系列的讀寫方便開發的介面。

Buffer基本用法(Basic Buffer Usage)

利用Buffer讀寫資料,通常遵循四個步驟:

  • 把資料寫入buffer;
  • 呼叫flip;
  • 從Buffer中讀取資料;
  • 呼叫buffer.clear()或者buffer.compact()

當寫入資料到buffer中時,buffer會記錄已經寫入的資料大小。當需要讀資料時,通過flip()方法把buffer從寫模式調整為讀模式;在讀模式下,可以讀取所有已經寫入的資料。

當讀取完資料後,需要清空buffer,以滿足後續寫入操作。清空buffer有兩種方式:呼叫clear()或compact()方法。clear會清空整個buffer,compact則只清空已讀取的資料,未被讀取的資料會被移動到buffer的開始位置,寫入位置則近跟著未讀資料之後。

讀資料

這裡有一個簡單的buffer案例,包括了write,flip和clear操作:

RandomAccessFile aFile = new RandomAccessFile("xx.txt", "rw");
FileChannel inChannel = aFile.getChannel();
//create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf); //read into buffer.
while (bytesRead != -1) {
  buf.flip();  //make buffer ready for read
  while(buf.hasRemaining()){
      System.out.print((char) buf.get()); // read 1 byte at a time
  }
  buf.clear(); //make buffer ready for writing
  bytesRead = inChannel.read(buf);

}
aFile.close();

寫資料

下面給出通過FileChannel來向檔案中寫入資料的一個例子:

File file = new File("data.txt");
FileOutputStream outputStream = new FileOutputStream(file);
FileChannel channel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
String string = "java nio";
buffer.put(string.getBytes());
buffer.flip();     //此處必須要呼叫buffer的flip方法
channel.write(buffer);
channel.close();
outputStream.close();

通過上面的程式會向工程目錄下的data.txt檔案寫入字串"java nio",注意在呼叫channel的write方法之前必須呼叫buffer的flip方法,否則無法正確寫入內容,至於具體原因將在下篇博文中具體講述Buffer的用法時闡述。

常見Buffer型別

Java NIO有如下具體的Buffer型別:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer常見方法,flip()

沒什麼比直接看原始碼學習更高效的了,直接上原始碼

如果你英文不錯的,官方的介紹其實是很清楚的。

我來稍微解釋下:

  1. flip()方法可以吧Buffer從寫模式(write mode)切換到讀模式(read mode)。
  2. 呼叫flip方法會把設定limit為之前的position的值,並把position歸零。
  3. 如果mark之前被設定了,那麼呼叫這個方法之後它將會被拋棄。

不太懂?沒關係,下面會有詳細的圖解

capacity,limit,position三個屬性的關係與圖解

給Buffer分配記憶體大小為10的快取區。

Buffer的容量為10,所以capacity為10,在這裡指向索引為10的空間。

Buffer初始化的時候,limit和capacity指向同一最後的那個索引,position指向0。

往Buffer里加一個資料。position位置移動,capacity不變,limit不變。

繼續加到5個數據,position指向索引為5的第6個數據,capacity不變,limit不變。

執行flip()

這時候對照著,之前flip原始碼去看。把position的值賦給limit,所以limit=5,然後position=0。capacity不變。結果就是:

Buffer開始往外寫資料。每寫一個,position就下移一個位置,一直移到limit的位置,結束。

所以讀完之後的狀態如圖

上圖的順序就是程式碼中的buffer從初始化,到寫資料,再讀資料三個狀態下,capacity,position,limit三個屬性的變化和關係。  大家可以發現:  1. 0 <= position <= limit <= capacity  2. capacity始終不變

圖中很好的闡述了,Buffer讀寫切換的過程。即flip()的反轉原理。接下來我們從程式碼中檢測上面的分析過程。想一下下面程式碼列印的內容,然後執行一編程式碼看看對不對。

public static void main(String[] args) {
    ByteBuffer buffer = ByteBuffer.allocate(10);
    System.out.println("初始化");
    System.out.println("position:" + buffer.position() +
            ",limit:" + buffer.limit() +
            ",capacity:" + buffer.capacity());
    String str = "abcde";
    buffer.put(str.getBytes());
    System.out.println("填充資料完畢後");
    System.out.println("position:" + buffer.position() +
            ",limit:" + buffer.limit() +
            ",capacity:" + buffer.capacity());

    buffer.flip();
    System.out.println("呼叫flip()後");
    System.out.println("position:" + buffer.position() +
            ",limit:" + buffer.limit() +
            ",capacity:" + buffer.capacity());

    System.out.println("開始讀取");
    while (buffer.hasRemaining()) {
        System.out.println("position:" + buffer.position() +
                ",limit:" + buffer.limit() +
                ",capacity:" + buffer.capacity());
        System.out.println("元素:" + Character.toString((char) buffer.get()));
    }
    System.out.println("讀取完畢");
    System.out.println("position:" + buffer.position() +
            ",limit:" + buffer.limit() +
            ",capacity:" + buffer.capacity());
}

執行結果

rewind()

直接上原始碼

原始碼講的很清楚,rewind()和flip()就相差一個

Limit = position;

clear() and compact()

一旦我們從buffer中讀取完資料,需要複用buffer為下次寫資料做準備。只需要呼叫clear或compact方法。

clear方法會重置position為0,limit為capacity,也就是整個Buffer清空。實際上Buffer中資料並沒有清空,我們只是把標記為修改了。

如果Buffer還有一些資料沒有讀取完,呼叫clear就會導致這部分資料被“遺忘”,因為我們沒有標記這部分資料未讀。

針對這種情況,如果需要保留未讀資料,那麼可以使用compact。 因此compact和clear的區別就在於對未讀資料的處理,是保留這部分資料還是一起清空。

mark() and reset()

提醒:mark()和reset()要搭配使用

使用步驟

  1. 先用mark()標記當前的position位置
  2. 後期再通過reset()恢復position到標記的位置。

mark原始碼

原始碼其實很清晰,就是 mark = position;

reset原始碼

把position恢復到之前通過mark()方法,標記的位置。

Channel與Buffer區別與聯絡

在NIO中並不是以流的方式來處理資料的,而是以buffer緩衝區和Channel管道配合使用來處理資料。

簡單理解一下:

  • Channel管道比作成鐵路,buffer緩衝區比作成火車(運載著貨物)

而我們的NIO就是通過Channel管道運輸著儲存資料的Buffer緩衝區的來實現資料的處理

  • 要時刻記住:Channel不與資料打交道,它只負責運輸資料。與資料打交道的是Buffer緩衝區
    • Channel-->運輸
    • Buffer-->資料

相對於傳統IO而言,流是單向的。對於NIO而言,有了Channel管道這個概念,我們的讀寫都是雙向的(鐵路上的火車能從廣州去北京、自然就能從北京返還到廣州)!

Channel通道只負責傳輸資料、不直接操作資料的。操作資料都是通過Buffer緩衝區來進行操作!

Selector

Selector就是用來管理多個Channel的。

使用步驟:

  1. 先把要管理的Channel註冊到Selector上
  2. 然後Selector能夠檢測多個註冊的Channel上是否有事件發生
  3. 如果有事件發生,便獲取事件然後針對每個事件進行相應的響應處理

這樣一來,只是用一個單執行緒就可以管理多個通道,也就是管理多個連線。這樣使得只有在連線真正有讀寫事件發生時,才會呼叫函式來進行讀寫,就大大地減少了系統開銷,並且不必為每個連線都建立一個執行緒,不用去維護多個執行緒,並且避免了多執行緒之間的上下文切換導致的開銷。

  與Selector有關的一個關鍵類是SelectionKey,一個SelectionKey表示一個到達的事件,這2個類構成了服務端處理業務的關鍵邏輯。

核心類

  1. Selector
  2. SelectionKey

先來看一下原始碼瞭解一下主要屬性和功能

Selector

SelectionKey

使用方法

建立Selector

Selector selector = Selector.open();

向Selector註冊通道

當向Selector中註冊Channel時,Channel必須是非阻塞的,所以不可以註冊FileChannel,因為FileChannel沒有實現SelectableChannel介面,不能配置為非阻塞狀態模式。

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

當將通道Channel註冊到Selector中時,需要在第二個引數中指定相應觀察事件的集合interest集合,即SelectionKey中的幾個代表狀態的整數。如果想註冊多種狀態需要用位或(|)操作符進行連線。

SelectionKey.OP_CONNECT

連線就緒

SelectionKey.OP_ACCEPT

接收就緒

SelectionKey.OP_READ

讀就緒

SelectionKey.OP_WRITE

寫就緒

可以在註冊完Channel到Selector之後,通過獲取到的SelectionKey來獲取ready集合key.readyOps();,即可以得到所觀察事件是否就緒的一個位或操作之後的值,此時只需要使用相應的interest值與readyOps值取與(&)操作即可確定此事件是否就緒。或者使用JDK提供的四個API(而實際JDK底層程式碼也是取與操作進行判斷的)如下:

key.isConnectable();
key.isAcceptable();
key.isReadable();
key.isWritable();

SelectionKey中還可以新增一些附加物件來標識對應註冊的是哪個Channel。方法有兩種如下

Object attach = new Object();
// 方法1:在註冊時候的第三個引數指定
SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT, attach);
// 方法2:使用attach()方法指定
key.attach(attach);

//獲取附加物件的方法
Object attached = key.attachment();

選擇通道

當向Selector中註冊了通道,就可以呼叫select來獲取有多少通道發生了我們所感興趣的(interest集合)事件和。該方法及其過載返回的int值表示有多少通道已經就緒。亦即,自上次呼叫select()方法後有多少通道變成就緒狀態。如果呼叫select()方法,因為有一個通道變成就緒狀態,返回了1,若再次呼叫select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法呼叫之間,只有一個通道就緒了。

一旦select()方法返回非零,就可以通過selector.selectedKeys()方法獲得所有的以選擇即已就緒SelectionKey,通過遍歷獲取到是SelectionKey的哪個事件就緒。注意每次迭代末尾的keyIterator.remove()呼叫。Selector不會自己從已選擇鍵集中移除SelectionKey例項。必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。

// 阻塞到至少有一個通道在你註冊的事件上就緒了。
selector.select();
// 和select()一樣,除了最長會阻塞timeout毫秒(引數)。
selector.select(1000);
// 不會阻塞,不管什麼通道就緒都立刻返回,如果無通道就緒,則立即返回零
selector.selectNow();

// 遍歷就緒的SelectionKey
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
     SelectionKey selectedKey = iterator.next();
     if (key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
     } else if (key.isConnectable()) {
            // a connection was established with a remote server.
     } else if (key.isReadable()) {
            // a channel is ready for reading
     } else if (key.isWritable()) {
            // a channel is ready for writing
     }
     iterator.remove();
}

喚醒Selector

某個執行緒呼叫select()方法後阻塞了,即使沒有通道已經就緒,也有辦法讓其從select()方法返回。只要讓其它執行緒在第一個執行緒呼叫select()方法的那個物件上呼叫Selector.wakeup()方法即可。阻塞在select()方法上的執行緒會立馬返回。

如果有其它執行緒呼叫了wakeup()方法,但當前沒有執行緒阻塞在select()方法上,下個呼叫select()方法的執行緒會立即“醒來(wake up)”。

selector.wakeup();

關閉Selector

用完Selector後呼叫其close()方法會關閉該Selector,且使註冊到該Selector上的所有SelectionKey例項無效。通道本身並不會關閉。

selector.close();

Selector程式碼實戰

後面講Reactor模式的時候會有一個綜合程式碼

為什麼使用Selector(Why Use a Selector?)

用單執行緒處理多個channels的好處是我需要更少的執行緒來處理channel。實際上,你甚至可以用一個執行緒來處理所有的channels。從作業系統的角度來看,切換執行緒開銷是比較昂貴的,並且每個執行緒都需要佔用系統資源,因此暫用執行緒越少越好。

需要留意的是,現代作業系統和CPU在多工處理上已經變得越來越好,所以多執行緒帶來的影響也越來越小。如果一個CPU是多核的,如果不執行多工反而是浪費了機器的效能。不過這些設計討論是另外的話題了。簡而言之,通過Selector我們可以實現單執行緒操作多個channel。

這有一幅示意圖,描述了單執行緒處理三個channel的情況:

Java NIO: A Thread uses a Selector to handle 3 Channel's

其餘功能介紹

看完以上陳述,詳細大家對NIO有了一定的瞭解,下面主要通過幾個案例,來說明NIO的其餘功能,下面程式碼量偏多,功能性講述偏少。

零拷貝

Java NIO中提供的FileChannel擁有transferTo和transferFrom兩個方法,可直接把FileChannel中的資料拷貝到另外一個Channel,或者直接把另外一個Channel中的資料拷貝到FileChannel。該介面常被用於高效的網路/檔案的資料傳輸和大檔案拷貝。在作業系統支援的情況下,通過該方法傳輸資料並不需要將源資料從核心態拷貝到使用者態,再從使用者態拷貝到目標通道的核心態,同時也避免了兩次使用者態和核心態間的上下文切換,也即使用了“零拷貝”,所以其效能一般高於Java IO中提供的方法。

使用FileChannel的零拷貝將本地檔案內容傳輸到網路的示例程式碼如下所示。

public class NIOClient {
  public static void main(String[] args) throws IOException, InterruptedException {
    SocketChannel socketChannel = SocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(1234);
    socketChannel.connect(address);
    RandomAccessFile file = new RandomAccessFile(
        NIOClient.class.getClassLoader().getResource("test.txt").getFile(), "rw");
    FileChannel channel = file.getChannel();
    channel.transferTo(0, channel.size(), socketChannel);
    channel.close();
    file.close();
    socketChannel.close();
  }
}

Scatter/Gatter

分散(scatter)從Channel中讀取是指在讀操作時將讀取的資料寫入多個buffer中。因此,Channel將從Channel中讀取的資料“分散(scatter)”到多個Buffer中。

聚集(gather)寫入Channel是指在寫操作時將多個buffer的資料寫入同一個Channel,因此,Channel 將多個Buffer中的資料“聚集(gather)”後傳送到Channel。

scatter / gather經常用於需要將傳輸的資料分開處理的場合,例如傳輸一個由訊息頭和訊息體組成的訊息,你可能會將訊息體和訊息頭分散到不同的buffer中,這樣你可以方便的處理訊息頭和訊息體。

案例:

public class ScattingAndGather
{
    public static void main(String args[]){
        gather();
    }

    public static void gather()
    {
        ByteBuffer header = ByteBuffer.allocate(10);
        ByteBuffer body = ByteBuffer.allocate(10);

        byte [] b1 = {'0', '1'};
        byte [] b2 = {'2', '3'};
        header.put(b1);
        body.put(b2);

        ByteBuffer [] buffs = {header, body};

        try
        {
            FileOutputStream os = new FileOutputStream("src/scattingAndGather.txt");
            FileChannel channel = os.getChannel();
            channel.write(buffs);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}

Pipe

Java NIO 管道是2個執行緒之間的單向資料連線。Pipe有一個source通道和一個sink通道。

資料會被寫到sink通道,從source通道讀取。

public static void method1() throws IOException {
    Pipe pipe = Pipe.open();
    ExecutorService exec = Executors.newFixedThreadPool(2);
    final Pipe pipeTemp = pipe;

    exec.submit(() -> {
        // 向通道中寫資料
        Pipe.SinkChannel sinkChannel = pipeTemp.sink();
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            String newData = "Pipe Test At Time " + System.currentTimeMillis();
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.clear();
            buf.put(newData.getBytes());
            buf.flip();

            while (buf.hasRemaining()) {
                System.out.println(buf);
                sinkChannel.write(buf);
            }
        }
    });

    exec.submit(() -> {
        // 向通道中讀資料
        Pipe.SourceChannel sourceChannel =
                pipeTemp.source();
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.clear();
            int bytesRead = sourceChannel.read(buf);
            System.out.println("bytesRead=" + bytesRead);
            while (bytesRead > 0) {
                buf.flip();
                byte b[] = new byte[bytesRead];
                int i = 0;
                while (buf.hasRemaining()) {
                    b[i] = buf.get();
                    System.out.println("%X" + b[i]);
                    i++;
                }
                String s = new String(b);
                System.out.println("===============||" + s);
                bytesRead = sourceChannel.read(buf);
            }
        }
    });
    exec.shutdown();
}

兩種高效能IO設計模式

  在傳統的網路服務設計模式中,有兩種比較經典的模式:

一種是 多執行緒,一種是執行緒池。

多執行緒

  對於多執行緒模式,也就說來了client,伺服器就會新建一個執行緒來處理該client的讀寫事件,如下圖所示:

這種模式雖然處理起來簡單方便,但是由於伺服器為每個client的連線都採用一個執行緒去處理,使得資源佔用非常大。因此,當連線數量達到上限時,再有使用者請求連線,直接會導致資源瓶頸,嚴重的可能會直接導致伺服器崩潰。

執行緒池

  因此,為了解決這種一個執行緒對應一個客戶端模式帶來的問題,提出了採用執行緒池的方式,也就說建立一個固定大小的執行緒池,來一個客戶端,就從執行緒池取一個空閒執行緒來處理,當客戶端處理完讀寫操作之後,就交出對執行緒的佔用。因此這樣就避免為每一個客戶端都要建立執行緒帶來的資源浪費,使得執行緒可以重用。

  但是執行緒池也有它的弊端,如果連線大多是長連線,因此可能會導致在一段時間內,執行緒池中的執行緒都被佔用,那麼當再有使用者請求連線時,由於沒有可用的空閒執行緒來處理,就會導致客戶端連線失敗,從而影響使用者體驗。因此,執行緒池比較適合大量的短連線應用。

因此便出現了下面的兩種高效能IO設計模式:Reactor和Proactor

Reactor

  在Reactor模式中,會先對每個client註冊感興趣的事件,然後有一個執行緒專門去輪詢每個client是否有事件發生,當有事件發生時,便順序處理每個事件,當所有事件處理完之後,便再轉去繼續輪詢,如下圖所示:

從這裡可以看出,多路複用IO就是採用Reactor模式。注意,上面的圖中展示的 是順序處理每個事件,當然為了提高事件處理速度,可以通過多執行緒或者執行緒池的方式來處理事件。

Proactor

在Proactor模式中,當檢測到有事件發生時,會新起一個非同步操作,然後交由核心執行緒去處理,當核心執行緒完成IO操作之後,傳送一個通知告知操作已完成,可以得知,非同步IO模型採用的就是Proactor模式。

實戰Reactor模式

以搭建一個伺服器為例子

  1. 經典Reactor模式
  2. 多執行緒Reactor模式

經典Reactor模式

經典的Reactor模式示意圖如下所示。

在Reactor模式中,包含如下角色

  1. Reactor 將I/O事件發派給對應的Handler
  2. Acceptor 處理客戶端連線請求
  3. Handlers 執行非阻塞讀/寫

最簡單的Reactor模式實現程式碼如下所示。

public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel
            = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (selector.select() > 0) {
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            if (key.isAcceptable()) {
                ServerSocketChannel acceptServerSocketChannel
                        = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel =
                        acceptServerSocketChannel.accept();
                System.out.println("Accept request from " + socketChannel.getRemoteAddress());
                socketChannel.register(selector, SelectionKey.OP_READ);

            } else if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = socketChannel.read(buffer);
                if (count <= 0) {
                    socketChannel.close();
                    key.cancel();
                    System.out.println("Received invalid data, close the connection");
                    continue;
                }
            }
            keys.remove(key);
        }
    }
}

為了方便閱讀,上示程式碼將Reactor模式中的所有角色放在了一個類中。

從上示程式碼中可以看到,多個Channel可以註冊到同一個Selector物件上,實現了一個執行緒同時監控多個請求狀態(Channel)。同時註冊時需要指定它所關注的事件,例如上示程式碼中socketServerChannel物件只註冊了OP_ACCEPT事件,而socketChannel物件只註冊了OP_READ事件。

selector.select()是阻塞的,當有至少一個通道可用時該方法返回可用通道個數。同時該方法只捕獲Channel註冊時指定的所關注的事件。

多工作執行緒Reactor模式

經典Reactor模式中,儘管一個執行緒可同時監控多個請求(Channel),但是所有讀/寫請求以及對新連線請求的處理都在同一個執行緒中處理,無法充分利用多CPU的優勢,同時讀/寫操作也會阻塞對新連線請求的處理。因此可以引入多執行緒,並行處理多個讀/寫操作,如下圖所示。

Netty中使用的Reactor模式,引入了多Reactor,也即一個主Reactor負責監控所有的連線請求,多個子Reactor負責監控並處理讀/寫請求,減輕了主Reactor的壓力,降低了主Reactor壓力太大而造成的延遲。

並且每個子Reactor分別屬於一個獨立的執行緒,每個成功連線後的Channel的所有操作由同一個執行緒處理。這樣保證了同一請求的所有狀態和上下文在同一個執行緒中,避免了不必要的上下文切換,同時也方便了監控請求響應狀態。

多執行緒Reactor模式示例程式碼如下所示。

多執行緒Reactor程式碼:

public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel =
            ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(1234));
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
        if (selector.selectNow() < 0) {
            continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            if (key.isAcceptable()) {
                ServerSocketChannel acceptServerSocketChannel =
                        (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = acceptServerSocketChannel.accept();
                socketChannel.configureBlocking(false);
                System.out.println("Accept request from " + socketChannel.getRemoteAddress());
                SelectionKey readKey = socketChannel.register(selector,
                        SelectionKey.OP_READ);
                readKey.attach(new Processor());
            } else if (key.isReadable()) {
                Processor processor = (Processor) key.attachment();
                processor.process(key);
            }
        }
    }
}

從上示程式碼中可以看到,註冊完SocketChannel的OP_READ事件後,可以對相應的SelectionKey attach一個物件(本例中attach了一個Processor物件,該物件處理讀請求),並且在獲取到可讀事件後,可以取出該物件。

注:attach物件及取出該物件是NIO提供的一種操作,但該操作並非Reactor模式的必要操作,本文使用它,只是為了方便演示NIO的介面。

具體的讀請求處理在如下所示的Processor類中。該類中設定了一個靜態的執行緒池處理所有請求。而process方法並不直接處理I/O請求,而是把該I/O操作提交給上述執行緒池去處理,這樣就充分利用了多執行緒的優勢,同時將對新連線的處理和讀/寫操作的處理放在了不同的執行緒中,讀/寫操作不再阻塞對新連線請求的處理。

public class Processor {
    private static final ExecutorService service =
            Executors.newFixedThreadPool(16);
    public void process(final SelectionKey selectionKey) {
        service.submit(() -> {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
                socketChannel.close();
                selectionKey.cancel();
                System.out.println("Read ended" + socketChannel);
                return null;
            } else if (count == 0) {
                return null;
            }
            return null;
        });
    }
}

參考: