Java NIO學習筆記:結合原始碼分析+Reactor模式
Java NIO和IO的主要區別
下表總結了Java NIO和IO之間的主要差別,我會更詳細地描述表中每部分的差異。
IO NIO 面向流 面向緩衝 阻塞IO 非阻塞IO 無 選擇器 |
面向流與面向緩衝
Java NIO和IO之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味著每次從流中讀一個或多個位元組,直至讀取所有位元組,它們沒有被快取在任何地方。此外,它不能前後移動流中的資料。如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。 Java NIO的緩衝導向方法略有不同。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的資料。
阻塞與非阻塞IO
Java IO的各種流(Stream)是阻塞的。這意味著,當一個執行緒呼叫read() 或 write()時,該執行緒被阻塞,直到有一些資料被讀取,或資料完全寫入。該執行緒在此期間不能再幹任何事情了。 Java NIO的非阻塞模式,使一個執行緒從某通道傳送請求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取。而不是保持執行緒阻塞,所以直至資料變的可以讀取之前,該執行緒可以繼續做其他的事情。 非阻塞寫也是如此。一個執行緒請求寫入一些資料到某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。 執行緒通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)。
先上程式碼看看,傳統IO vs NIO
傳統IO讀取檔案 程式碼一覽
案例採用FileInputStream讀取檔案內容的:
// 傳統IO模式讀取檔案內容 public void IoDemo() throws IOException { InputStream in = new BufferedInputStream(new FileInputStream("xx.txt")); byte[] buf = new byte[1024]; int bytesRead = in.read(buf); while (bytesRead != -1) { for (int i = 0;i < bytesRead; i++) { System.out.println((char) buf[i]); bytesRead = in.read(buf); } } in.close(); }
輸出結果:(略)
然後看看NIO的程式碼怎麼寫
案例是對應的NIO(這裡通過RandomAccessFile進行操作,當然也可以通過FileInputStream.getChannel()進行操作):
// NIO模式讀取檔案內容
public void NioTest() throws IOException {
RandomAccessFile aFile =
new RandomAccessFile("xx.txt", "tw");
FileChannel fileChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = fileChannel.read(buf);
System.out.println(bytesRead);
while (bytesRead != -1) {
buf.flip();
while (buf.hasRemaining()) {
System.out.println((char) buf.get());
}
buf.compact();
bytesRead = fileChannel.read(buf);
}
aFile.close();
}
輸出結果:(略) 通過程式碼的對比,應該能看出個大概,最起碼能發現NIO的實現方式比叫複雜。有了一個大概的印象可以進入下一步了。
NIO 核心元件
- Channel
- Buffer
- Selector
Channel
Java NIO Channel通道和流非常相似,主要有以下幾點區別:
- Channel是雙向的,也就是說可讀也可寫。Stream是單向的(只能讀或者寫)。
- Channel可以非同步讀寫。
- Channel總是基於緩衝區Buffer來讀寫。
Java NIO: Channels read data into Buffers, and Buffers write data into Channels
翻譯:Channels讀資料到Buffers,Buffers寫資料到Channels
這裡有個問題,就是往buffer裡寫資料為什麼是channel.read(buffer);呢,按理說“寫”對應的不應該是“write”嗎。我剛開始這裡也容易搞混。後來看了原始碼的解釋和重新思考了一下面向物件的思想,這個問題也就想明白了
- 首先面向物件的主體是物件,所有的操作都是圍繞著這個物件展開的。這裡channel就是物件,而read()這個方法是針對channel的。
- 再來看原始碼的解釋
注意這句:
Reads a sequence of bytes from this channel into the given buffer.
翻譯:從channel讀一些bytes到buffer裡。
小結:這個read()是針對channel而言的,意思就是從先從channel讀取一些bytes,然後再寫到buffer裡。
常用Channel
Channel就像他的名字一樣,只是個“管道”不儲存資料,是用來傳輸資料的,而且這種傳輸是雙向的,可以傳入也可以傳出。
下面列出Java NIO中最重要的集中Channel的實現:
- FileChannel:檔案的資料讀寫。
- DatagramChannel:UDP的資料讀寫。
- SocketChannel:TCP的資料讀寫。
- ServerSocketChannel:允許我們監聽TCP連結請求,每個請求會建立會一個SocketChannel.
Channel的基礎示例(Basic Channel Example)
這有一個利用FileChannel讀取資料到Buffer的例子:
注意:因為Channel要結合Buffer一起才能使用,所以這裡的Channel示例和下面的Buffer程式碼基本相同。更詳細的後面會講
RandomAccessFile aFile = new RandomAccessFile("xx.txt", "rw");
FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
System.out.println("Read " + bytesRead);
buf.flip();
while(buf.hasRemaining()){
System.out.print((char) buf.get());
}
buf.clear();
bytesRead = inChannel.read(buf);
}
aFile.close();
注意buf.flip()的呼叫,
如果想要從Buffer讀取資料,要呼叫flip()方法,把Buffer從寫模式(write mode)切換到讀模式(read mode)。
後面會詳細講file()
Buffer
Buffer顧名思義:緩衝區,實際上是一個容器,一個連續陣列。
Channel提供從檔案、網路讀取資料的渠道,但是讀寫的資料都必須經過Buffer。如下圖:
向Buffer中寫資料有兩種方法:
- 從Channel寫到Buffer (fileChannel.read(buf))
- 通過Buffer的put()方法 (buf.put(…))
從Buffer中讀取資料同樣也有兩種方法:
- 從Buffer讀取到Channel (channel.write(buf))
- 使用get()方法從Buffer中讀取資料 (buf.get())
Buffer屬性:
- capacity
- position
- limit
- mark
這裡直接看原始碼
原始碼中這一局交代了他們之間的大小關係
// Invariants: mark <= position <= limit <= capacity
Buffer屬性解釋:
索引 |
說明 |
capacity |
緩衝區陣列的總長度。固定不變 |
position |
是下一個要讀取或寫入的元素的索引(注意!不是當前索引!)。0 <= position <= limit |
limit |
是讀\寫模式下可讀\寫的最大範圍。0 <= limit <= capacity |
mark |
用於記錄當前position的位置,用於恢復position的位置。預設為-1。後面會講 |
Java NIO Buffers用於和NIO Channel互動。正如你已經知道的,我們從channel中讀取資料到buffers裡,從buffer把資料寫入到channels.
buffer本質上就是一塊記憶體區,可以用來寫入資料,並在稍後讀取出來。這塊記憶體被NIO Buffer包裹起來,對外提供一系列的讀寫方便開發的介面。
Buffer基本用法(Basic Buffer Usage)
利用Buffer讀寫資料,通常遵循四個步驟:
- 把資料寫入buffer;
- 呼叫flip;
- 從Buffer中讀取資料;
- 呼叫buffer.clear()或者buffer.compact()
當寫入資料到buffer中時,buffer會記錄已經寫入的資料大小。當需要讀資料時,通過flip()方法把buffer從寫模式調整為讀模式;在讀模式下,可以讀取所有已經寫入的資料。
當讀取完資料後,需要清空buffer,以滿足後續寫入操作。清空buffer有兩種方式:呼叫clear()或compact()方法。clear會清空整個buffer,compact則只清空已讀取的資料,未被讀取的資料會被移動到buffer的開始位置,寫入位置則近跟著未讀資料之後。
讀資料
這裡有一個簡單的buffer案例,包括了write,flip和clear操作:
RandomAccessFile aFile = new RandomAccessFile("xx.txt", "rw");
FileChannel inChannel = aFile.getChannel();
//create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf); //read into buffer.
while (bytesRead != -1) {
buf.flip(); //make buffer ready for read
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // read 1 byte at a time
}
buf.clear(); //make buffer ready for writing
bytesRead = inChannel.read(buf);
}
aFile.close();
寫資料
下面給出通過FileChannel來向檔案中寫入資料的一個例子:
File file = new File("data.txt");
FileOutputStream outputStream = new FileOutputStream(file);
FileChannel channel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
String string = "java nio";
buffer.put(string.getBytes());
buffer.flip(); //此處必須要呼叫buffer的flip方法
channel.write(buffer);
channel.close();
outputStream.close();
通過上面的程式會向工程目錄下的data.txt檔案寫入字串"java nio",注意在呼叫channel的write方法之前必須呼叫buffer的flip方法,否則無法正確寫入內容,至於具體原因將在下篇博文中具體講述Buffer的用法時闡述。
常見Buffer型別
Java NIO有如下具體的Buffer型別:
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
Buffer常見方法,flip()
沒什麼比直接看原始碼學習更高效的了,直接上原始碼
如果你英文不錯的,官方的介紹其實是很清楚的。
我來稍微解釋下:
- flip()方法可以吧Buffer從寫模式(write mode)切換到讀模式(read mode)。
- 呼叫flip方法會把設定limit為之前的position的值,並把position歸零。
- 如果mark之前被設定了,那麼呼叫這個方法之後它將會被拋棄。
不太懂?沒關係,下面會有詳細的圖解
capacity,limit,position三個屬性的關係與圖解
給Buffer分配記憶體大小為10的快取區。
Buffer的容量為10,所以capacity為10,在這裡指向索引為10的空間。
Buffer初始化的時候,limit和capacity指向同一最後的那個索引,position指向0。
往Buffer里加一個資料。position位置移動,capacity不變,limit不變。
繼續加到5個數據,position指向索引為5的第6個數據,capacity不變,limit不變。
執行flip()
這時候對照著,之前flip原始碼去看。把position的值賦給limit,所以limit=5,然後position=0。capacity不變。結果就是:
Buffer開始往外寫資料。每寫一個,position就下移一個位置,一直移到limit的位置,結束。
所以讀完之後的狀態如圖
上圖的順序就是程式碼中的buffer從初始化,到寫資料,再讀資料三個狀態下,capacity,position,limit三個屬性的變化和關係。 大家可以發現: 1. 0 <= position <= limit <= capacity 2. capacity始終不變
圖中很好的闡述了,Buffer讀寫切換的過程。即flip()的反轉原理。接下來我們從程式碼中檢測上面的分析過程。想一下下面程式碼列印的內容,然後執行一編程式碼看看對不對。
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println("初始化");
System.out.println("position:" + buffer.position() +
",limit:" + buffer.limit() +
",capacity:" + buffer.capacity());
String str = "abcde";
buffer.put(str.getBytes());
System.out.println("填充資料完畢後");
System.out.println("position:" + buffer.position() +
",limit:" + buffer.limit() +
",capacity:" + buffer.capacity());
buffer.flip();
System.out.println("呼叫flip()後");
System.out.println("position:" + buffer.position() +
",limit:" + buffer.limit() +
",capacity:" + buffer.capacity());
System.out.println("開始讀取");
while (buffer.hasRemaining()) {
System.out.println("position:" + buffer.position() +
",limit:" + buffer.limit() +
",capacity:" + buffer.capacity());
System.out.println("元素:" + Character.toString((char) buffer.get()));
}
System.out.println("讀取完畢");
System.out.println("position:" + buffer.position() +
",limit:" + buffer.limit() +
",capacity:" + buffer.capacity());
}
執行結果
rewind()
直接上原始碼
原始碼講的很清楚,rewind()和flip()就相差一個
Limit = position;
clear() and compact()
一旦我們從buffer中讀取完資料,需要複用buffer為下次寫資料做準備。只需要呼叫clear或compact方法。
clear方法會重置position為0,limit為capacity,也就是整個Buffer清空。實際上Buffer中資料並沒有清空,我們只是把標記為修改了。
如果Buffer還有一些資料沒有讀取完,呼叫clear就會導致這部分資料被“遺忘”,因為我們沒有標記這部分資料未讀。
針對這種情況,如果需要保留未讀資料,那麼可以使用compact。 因此compact和clear的區別就在於對未讀資料的處理,是保留這部分資料還是一起清空。
mark() and reset()
提醒:mark()和reset()要搭配使用
使用步驟
- 先用mark()標記當前的position位置
- 後期再通過reset()恢復position到標記的位置。
mark原始碼
原始碼其實很清晰,就是 mark = position;
reset原始碼
把position恢復到之前通過mark()方法,標記的位置。
Channel與Buffer區別與聯絡
在NIO中並不是以流的方式來處理資料的,而是以buffer緩衝區和Channel管道配合使用來處理資料。
簡單理解一下:
- Channel管道比作成鐵路,buffer緩衝區比作成火車(運載著貨物)
而我們的NIO就是通過Channel管道運輸著儲存資料的Buffer緩衝區的來實現資料的處理!
- 要時刻記住:Channel不與資料打交道,它只負責運輸資料。與資料打交道的是Buffer緩衝區
- Channel-->運輸
- Buffer-->資料
相對於傳統IO而言,流是單向的。對於NIO而言,有了Channel管道這個概念,我們的讀寫都是雙向的(鐵路上的火車能從廣州去北京、自然就能從北京返還到廣州)!
Channel通道只負責傳輸資料、不直接操作資料的。操作資料都是通過Buffer緩衝區來進行操作!
Selector
Selector就是用來管理多個Channel的。
使用步驟:
- 先把要管理的Channel註冊到Selector上
- 然後Selector能夠檢測多個註冊的Channel上是否有事件發生
- 如果有事件發生,便獲取事件然後針對每個事件進行相應的響應處理
這樣一來,只是用一個單執行緒就可以管理多個通道,也就是管理多個連線。這樣使得只有在連線真正有讀寫事件發生時,才會呼叫函式來進行讀寫,就大大地減少了系統開銷,並且不必為每個連線都建立一個執行緒,不用去維護多個執行緒,並且避免了多執行緒之間的上下文切換導致的開銷。
與Selector有關的一個關鍵類是SelectionKey,一個SelectionKey表示一個到達的事件,這2個類構成了服務端處理業務的關鍵邏輯。
核心類
- Selector
- SelectionKey
先來看一下原始碼瞭解一下主要屬性和功能
Selector
SelectionKey
使用方法
建立Selector
Selector selector = Selector.open();
向Selector註冊通道
當向Selector中註冊Channel時,Channel必須是非阻塞的,所以不可以註冊FileChannel,因為FileChannel沒有實現SelectableChannel介面,不能配置為非阻塞狀態模式。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
當將通道Channel註冊到Selector中時,需要在第二個引數中指定相應觀察事件的集合interest集合,即SelectionKey中的幾個代表狀態的整數。如果想註冊多種狀態需要用位或(|)操作符進行連線。
SelectionKey.OP_CONNECT |
連線就緒 |
SelectionKey.OP_ACCEPT |
接收就緒 |
SelectionKey.OP_READ |
讀就緒 |
SelectionKey.OP_WRITE |
寫就緒 |
可以在註冊完Channel到Selector之後,通過獲取到的SelectionKey來獲取ready集合key.readyOps();,即可以得到所觀察事件是否就緒的一個位或操作之後的值,此時只需要使用相應的interest值與readyOps值取與(&)操作即可確定此事件是否就緒。或者使用JDK提供的四個API(而實際JDK底層程式碼也是取與操作進行判斷的)如下:
key.isConnectable();
key.isAcceptable();
key.isReadable();
key.isWritable();
SelectionKey中還可以新增一些附加物件來標識對應註冊的是哪個Channel。方法有兩種如下
Object attach = new Object();
// 方法1:在註冊時候的第三個引數指定
SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT, attach);
// 方法2:使用attach()方法指定
key.attach(attach);
//獲取附加物件的方法
Object attached = key.attachment();
選擇通道
當向Selector中註冊了通道,就可以呼叫select來獲取有多少通道發生了我們所感興趣的(interest集合)事件和。該方法及其過載返回的int值表示有多少通道已經就緒。亦即,自上次呼叫select()方法後有多少通道變成就緒狀態。如果呼叫select()方法,因為有一個通道變成就緒狀態,返回了1,若再次呼叫select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法呼叫之間,只有一個通道就緒了。
一旦select()方法返回非零,就可以通過selector.selectedKeys()方法獲得所有的以選擇即已就緒SelectionKey,通過遍歷獲取到是SelectionKey的哪個事件就緒。注意每次迭代末尾的keyIterator.remove()呼叫。Selector不會自己從已選擇鍵集中移除SelectionKey例項。必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。
// 阻塞到至少有一個通道在你註冊的事件上就緒了。
selector.select();
// 和select()一樣,除了最長會阻塞timeout毫秒(引數)。
selector.select(1000);
// 不會阻塞,不管什麼通道就緒都立刻返回,如果無通道就緒,則立即返回零
selector.selectNow();
// 遍歷就緒的SelectionKey
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectedKey = iterator.next();
if (key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
iterator.remove();
}
喚醒Selector
某個執行緒呼叫select()方法後阻塞了,即使沒有通道已經就緒,也有辦法讓其從select()方法返回。只要讓其它執行緒在第一個執行緒呼叫select()方法的那個物件上呼叫Selector.wakeup()方法即可。阻塞在select()方法上的執行緒會立馬返回。
如果有其它執行緒呼叫了wakeup()方法,但當前沒有執行緒阻塞在select()方法上,下個呼叫select()方法的執行緒會立即“醒來(wake up)”。
selector.wakeup();
關閉Selector
用完Selector後呼叫其close()方法會關閉該Selector,且使註冊到該Selector上的所有SelectionKey例項無效。通道本身並不會關閉。
selector.close();
Selector程式碼實戰
後面講Reactor模式的時候會有一個綜合程式碼
為什麼使用Selector(Why Use a Selector?)
用單執行緒處理多個channels的好處是我需要更少的執行緒來處理channel。實際上,你甚至可以用一個執行緒來處理所有的channels。從作業系統的角度來看,切換執行緒開銷是比較昂貴的,並且每個執行緒都需要佔用系統資源,因此暫用執行緒越少越好。
需要留意的是,現代作業系統和CPU在多工處理上已經變得越來越好,所以多執行緒帶來的影響也越來越小。如果一個CPU是多核的,如果不執行多工反而是浪費了機器的效能。不過這些設計討論是另外的話題了。簡而言之,通過Selector我們可以實現單執行緒操作多個channel。
這有一幅示意圖,描述了單執行緒處理三個channel的情況:
Java NIO: A Thread uses a Selector to handle 3 Channel's
其餘功能介紹
看完以上陳述,詳細大家對NIO有了一定的瞭解,下面主要通過幾個案例,來說明NIO的其餘功能,下面程式碼量偏多,功能性講述偏少。
零拷貝
Java NIO中提供的FileChannel擁有transferTo和transferFrom兩個方法,可直接把FileChannel中的資料拷貝到另外一個Channel,或者直接把另外一個Channel中的資料拷貝到FileChannel。該介面常被用於高效的網路/檔案的資料傳輸和大檔案拷貝。在作業系統支援的情況下,通過該方法傳輸資料並不需要將源資料從核心態拷貝到使用者態,再從使用者態拷貝到目標通道的核心態,同時也避免了兩次使用者態和核心態間的上下文切換,也即使用了“零拷貝”,所以其效能一般高於Java IO中提供的方法。
使用FileChannel的零拷貝將本地檔案內容傳輸到網路的示例程式碼如下所示。
public class NIOClient {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open();
InetSocketAddress address = new InetSocketAddress(1234);
socketChannel.connect(address);
RandomAccessFile file = new RandomAccessFile(
NIOClient.class.getClassLoader().getResource("test.txt").getFile(), "rw");
FileChannel channel = file.getChannel();
channel.transferTo(0, channel.size(), socketChannel);
channel.close();
file.close();
socketChannel.close();
}
}
Scatter/Gatter
分散(scatter)從Channel中讀取是指在讀操作時將讀取的資料寫入多個buffer中。因此,Channel將從Channel中讀取的資料“分散(scatter)”到多個Buffer中。
聚集(gather)寫入Channel是指在寫操作時將多個buffer的資料寫入同一個Channel,因此,Channel 將多個Buffer中的資料“聚集(gather)”後傳送到Channel。
scatter / gather經常用於需要將傳輸的資料分開處理的場合,例如傳輸一個由訊息頭和訊息體組成的訊息,你可能會將訊息體和訊息頭分散到不同的buffer中,這樣你可以方便的處理訊息頭和訊息體。
案例:
public class ScattingAndGather
{
public static void main(String args[]){
gather();
}
public static void gather()
{
ByteBuffer header = ByteBuffer.allocate(10);
ByteBuffer body = ByteBuffer.allocate(10);
byte [] b1 = {'0', '1'};
byte [] b2 = {'2', '3'};
header.put(b1);
body.put(b2);
ByteBuffer [] buffs = {header, body};
try
{
FileOutputStream os = new FileOutputStream("src/scattingAndGather.txt");
FileChannel channel = os.getChannel();
channel.write(buffs);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
Pipe
Java NIO 管道是2個執行緒之間的單向資料連線。Pipe有一個source通道和一個sink通道。
資料會被寫到sink通道,從source通道讀取。
public static void method1() throws IOException {
Pipe pipe = Pipe.open();
ExecutorService exec = Executors.newFixedThreadPool(2);
final Pipe pipeTemp = pipe;
exec.submit(() -> {
// 向通道中寫資料
Pipe.SinkChannel sinkChannel = pipeTemp.sink();
while (true) {
TimeUnit.SECONDS.sleep(1);
String newData = "Pipe Test At Time " + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while (buf.hasRemaining()) {
System.out.println(buf);
sinkChannel.write(buf);
}
}
});
exec.submit(() -> {
// 向通道中讀資料
Pipe.SourceChannel sourceChannel =
pipeTemp.source();
while (true) {
TimeUnit.SECONDS.sleep(1);
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
int bytesRead = sourceChannel.read(buf);
System.out.println("bytesRead=" + bytesRead);
while (bytesRead > 0) {
buf.flip();
byte b[] = new byte[bytesRead];
int i = 0;
while (buf.hasRemaining()) {
b[i] = buf.get();
System.out.println("%X" + b[i]);
i++;
}
String s = new String(b);
System.out.println("===============||" + s);
bytesRead = sourceChannel.read(buf);
}
}
});
exec.shutdown();
}
兩種高效能IO設計模式
在傳統的網路服務設計模式中,有兩種比較經典的模式:
一種是 多執行緒,一種是執行緒池。
多執行緒
對於多執行緒模式,也就說來了client,伺服器就會新建一個執行緒來處理該client的讀寫事件,如下圖所示:
這種模式雖然處理起來簡單方便,但是由於伺服器為每個client的連線都採用一個執行緒去處理,使得資源佔用非常大。因此,當連線數量達到上限時,再有使用者請求連線,直接會導致資源瓶頸,嚴重的可能會直接導致伺服器崩潰。
執行緒池
因此,為了解決這種一個執行緒對應一個客戶端模式帶來的問題,提出了採用執行緒池的方式,也就說建立一個固定大小的執行緒池,來一個客戶端,就從執行緒池取一個空閒執行緒來處理,當客戶端處理完讀寫操作之後,就交出對執行緒的佔用。因此這樣就避免為每一個客戶端都要建立執行緒帶來的資源浪費,使得執行緒可以重用。
但是執行緒池也有它的弊端,如果連線大多是長連線,因此可能會導致在一段時間內,執行緒池中的執行緒都被佔用,那麼當再有使用者請求連線時,由於沒有可用的空閒執行緒來處理,就會導致客戶端連線失敗,從而影響使用者體驗。因此,執行緒池比較適合大量的短連線應用。
因此便出現了下面的兩種高效能IO設計模式:Reactor和Proactor。
Reactor
在Reactor模式中,會先對每個client註冊感興趣的事件,然後有一個執行緒專門去輪詢每個client是否有事件發生,當有事件發生時,便順序處理每個事件,當所有事件處理完之後,便再轉去繼續輪詢,如下圖所示:
從這裡可以看出,多路複用IO就是採用Reactor模式。注意,上面的圖中展示的 是順序處理每個事件,當然為了提高事件處理速度,可以通過多執行緒或者執行緒池的方式來處理事件。
Proactor
在Proactor模式中,當檢測到有事件發生時,會新起一個非同步操作,然後交由核心執行緒去處理,當核心執行緒完成IO操作之後,傳送一個通知告知操作已完成,可以得知,非同步IO模型採用的就是Proactor模式。
實戰Reactor模式
以搭建一個伺服器為例子
- 經典Reactor模式
- 多執行緒Reactor模式
經典Reactor模式
經典的Reactor模式示意圖如下所示。
在Reactor模式中,包含如下角色
- Reactor 將I/O事件發派給對應的Handler
- Acceptor 處理客戶端連線請求
- Handlers 執行非阻塞讀/寫
最簡單的Reactor模式實現程式碼如下所示。
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel
= ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel
= (ServerSocketChannel) key.channel();
SocketChannel socketChannel =
acceptServerSocketChannel.accept();
System.out.println("Accept request from " + socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count <= 0) {
socketChannel.close();
key.cancel();
System.out.println("Received invalid data, close the connection");
continue;
}
}
keys.remove(key);
}
}
}
為了方便閱讀,上示程式碼將Reactor模式中的所有角色放在了一個類中。
從上示程式碼中可以看到,多個Channel可以註冊到同一個Selector物件上,實現了一個執行緒同時監控多個請求狀態(Channel)。同時註冊時需要指定它所關注的事件,例如上示程式碼中socketServerChannel物件只註冊了OP_ACCEPT事件,而socketChannel物件只註冊了OP_READ事件。
selector.select()是阻塞的,當有至少一個通道可用時該方法返回可用通道個數。同時該方法只捕獲Channel註冊時指定的所關注的事件。
多工作執行緒Reactor模式
經典Reactor模式中,儘管一個執行緒可同時監控多個請求(Channel),但是所有讀/寫請求以及對新連線請求的處理都在同一個執行緒中處理,無法充分利用多CPU的優勢,同時讀/寫操作也會阻塞對新連線請求的處理。因此可以引入多執行緒,並行處理多個讀/寫操作,如下圖所示。
Netty中使用的Reactor模式,引入了多Reactor,也即一個主Reactor負責監控所有的連線請求,多個子Reactor負責監控並處理讀/寫請求,減輕了主Reactor的壓力,降低了主Reactor壓力太大而造成的延遲。
並且每個子Reactor分別屬於一個獨立的執行緒,每個成功連線後的Channel的所有操作由同一個執行緒處理。這樣保證了同一請求的所有狀態和上下文在同一個執行緒中,避免了不必要的上下文切換,同時也方便了監控請求響應狀態。
多執行緒Reactor模式示例程式碼如下所示。
多執行緒Reactor程式碼:
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if (selector.selectNow() < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel =
(ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request from " + socketChannel.getRemoteAddress());
SelectionKey readKey = socketChannel.register(selector,
SelectionKey.OP_READ);
readKey.attach(new Processor());
} else if (key.isReadable()) {
Processor processor = (Processor) key.attachment();
processor.process(key);
}
}
}
}
從上示程式碼中可以看到,註冊完SocketChannel的OP_READ事件後,可以對相應的SelectionKey attach一個物件(本例中attach了一個Processor物件,該物件處理讀請求),並且在獲取到可讀事件後,可以取出該物件。
注:attach物件及取出該物件是NIO提供的一種操作,但該操作並非Reactor模式的必要操作,本文使用它,只是為了方便演示NIO的介面。
具體的讀請求處理在如下所示的Processor類中。該類中設定了一個靜態的執行緒池處理所有請求。而process方法並不直接處理I/O請求,而是把該I/O操作提交給上述執行緒池去處理,這樣就充分利用了多執行緒的優勢,同時將對新連線的處理和讀/寫操作的處理放在了不同的執行緒中,讀/寫操作不再阻塞對新連線請求的處理。
public class Processor {
private static final ExecutorService service =
Executors.newFixedThreadPool(16);
public void process(final SelectionKey selectionKey) {
service.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
selectionKey.cancel();
System.out.println("Read ended" + socketChannel);
return null;
} else if (count == 0) {
return null;
}
return null;
});
}
}
參考: