Nio使用Selector客戶端與服務器的通信
使用NIO的一個最大優勢就是客戶端於服務器自己的不再是阻塞式的,也就意味著服務器無需通過為每個客戶端的鏈接而開啟一個線程。而是通過一個叫Selector的輪循器來不斷的檢測那個Channel有消息處理。
簡單來講,Selector會不斷地輪詢註冊在其上的Channel,如果某個Channel上面有新的TCP連接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的Set集合,進行後續的I/O操作。
由於select操作只管對selectedKeys的集合進行添加而不負責移除,所以當某個消息被處理後我們需要從該集合裏去掉。
一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,所以它並沒有最大連接句柄1024/2048的限制。這也就意味著只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端,這確實是個非常巨大的進步。
下面,我們通過NIO編程的序列圖和源碼分析來熟悉相關的概念,以便鞏固我們前面所學的NIO基礎知識。
è¿éåå¾çæè¿°
下面,我們對NIO服務端的主要創建過程進行講解和說明,作為NIO的基礎入門,我們將忽略掉一些在生產環境中部署所需要的一些特性和功能(比如TCP半包等問題)。
步驟一:打開ServerSocketChannel,用於監聽客戶端的連接,它是所有客戶端連接的父管道,代碼示例如下。
ServerSocketChannel server = ServerSocketChannel.open();
步驟二:綁定監聽端口,設置連接為非阻塞模式,示例代碼如下。
server.socket().bind(new InetSocketAddress(7777),1024);
// 設置為非阻塞模式, 這個非常重要
server.configureBlocking(false);
步驟三:創建Reactor線程,創建多路復用器並啟動線程,代碼如下。(即 selector)
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
步驟四:將ServerSocketChannel註冊到Reactor線程的多路復用器Selector上,監聽ACCEPT事件,代碼如下。
server.register(selector, SelectionKey.OP_ACCEPT);
步驟五:多路復用器在線程run方法的無限循環體內輪詢準備就緒的Key,代碼如下。
while(true){
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = (SelectKey)it.next();
//處理io
}
}
步驟六:多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路,代碼示例如下。
// 得到與客戶端的套接字通道
SocketChannel channel = ssc.accept();
步驟七:設置客戶端鏈路為非阻塞模式,示例代碼如下。
channel.configureBlocking(false);
步驟八:將新接入的客戶端連接註冊到Reactor線程的多路復用器上,監聽讀操作,用來讀取客戶端發送的網絡消息,代碼如下。
channel.register(selector, SelectionKey.OP_READ);
步驟九:異步讀取客戶端請求消息到緩沖區,示例代碼如下。
int readBytes = channel.read(byteBuffer);
步驟十:對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排,示例代碼如下。
Object message = null;
while (buffer.hasRemain()) {
buffer.mark();
message = decode(buffer);
if(message == null){
buffer.reset();
break;
}
}
if(!buffer.hasRemain()){
buffer.clear();
}else {
buffer.compact();
}
//業務線程處理message
步驟十一:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端,示例代碼如下。
socketChannel.write(byteBuffer);
註意:如果發送區TCP緩沖區滿,會導致寫半包,此時,需要註冊監聽寫操作位,循環寫,直到整包消息寫入TCP緩沖區,此處不贅述,後續會詳細分析Netty的處理策略。
package chanel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class NioService { public static void main(String[] args) { try { NioService server = new NioService(); server.init(); } catch (IOException e) { e.printStackTrace(); } } public void init() throws IOException { Charset charset = Charset.forName("UTF-8"); // 創建一個選擇器,可用close()關閉,isOpen()表示是否處於打開狀態,他不隸屬於當前線程 Selector selector = Selector.open(); // 創建ServerSocketChannel,並把它綁定到指定端口上 ServerSocketChannel server = ServerSocketChannel.open(); server.socket().bind(new InetSocketAddress(7777),1024); // 設置為非阻塞模式, 這個非常重要 server.configureBlocking(false); // 在選擇器裏面註冊關註這個服務器套接字通道的accept事件 // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用於SocketChannel server.register(selector, SelectionKey.OP_ACCEPT); while (true) { //休眠時間為1s,無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次 selector.select(1000); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while (it.hasNext()) { //如果key對應的Channel包含客戶端的鏈接請求 // OP_ACCEPT 這個只有ServerSocketChannel才有可能觸發 key=it.next(); // 由於select操作只管對selectedKeys進行添加,所以key處理後我們需要從裏面把key去掉 it.remove(); if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 得到與客戶端的套接字通道 //ServerSocketChannel的accept接收客戶端的連接請求並創建SocketChannel實例,完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。 //我們需要將新創建的SocketChannel設置為異步非阻塞,同時也可以對其TCP參數進行設置,例如TCP接收和發送緩沖區的大小等。此處省掉 SocketChannel channel = ssc.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); //將key對應Channel設置為準備接受其他請求 key.interestOps(SelectionKey.OP_ACCEPT); } if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); String content = ""; try { int readBytes = channel.read(byteBuffer); if (readBytes > 0) { byteBuffer.flip(); //為write()準備 byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); content+=new String(bytes); System.out.println(content); //回應客戶端 doWrite(channel); } // 寫完就把狀態關註去掉,否則會一直觸發寫事件(改變自身關註事件) key.interestOps(SelectionKey.OP_READ); } catch (IOException i) { //如果捕獲到該SelectionKey對應的Channel時出現了異常,即表明該Channel對於的Client出現了問題 //所以從Selector中取消該SelectionKey的註冊 key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } } private void doWrite(SocketChannel sc) throws IOException{ byte[] req ="服務器已接受aaa".getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(req.length); byteBuffer.put(req); byteBuffer.flip(); sc.write(byteBuffer); if(!byteBuffer.hasRemaining()){ System.out.println("Send 2 Service successed"); } } }
現在我們來看編寫客戶端的流程:
è¿éåå¾çæè¿°
步驟一:打開SocketChannel,綁定客戶端本地地址(可選,默認系統會隨機分配一個可用的本地地址),示例代碼如下。
SocketChannel channel = SocketChannel.open();
步驟二:設置SocketChannel為非阻塞模式,同時設置客戶端連接的TCP參數,示例代碼如下。
channel.configureBlocking(false);
步驟三:異步連接服務端,示例代碼如下。
步驟四:判斷是否連接成功,如果連接成功,則直接註冊讀狀態位到多路復用器中,如果當前沒有連接成功(異步連接,返回false,說明客戶端已經發送sync包,服務端沒有返回ack包,物理鏈路還沒有建立),示例代碼如下。
if(channel.connect(new InetSocketAddress("127.0.0.1",7777))){
channel.register(selector, SelectionKey.OP_READ);
//發送消息
doWrite(channel, "66666666");
}else {
channel.register(selector, SelectionKey.OP_CONNECT);
}
步驟五:向Reactor線程的多路復用器註冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答,示例代碼如下。
channel.register(selector, SelectionKey.OP_CONNECT);
步驟六:創建Reactor線程,創建多路復用器並啟動線程,代碼如下。
selector = Selector.open();
new Thread(new ReactorTask()).start();
步驟七:多路復用器在線程run方法的無限循環體內輪詢準備就緒的Key,代碼如下。
while (!stop){
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()){
}
}
步驟八:接收connect事件進行處理並判斷是否鏈接成功,示例代碼如下。
if (key.isConnectable()){
if (channel.finishConnect()) {
}
}
步驟九:註冊讀事件到多路復用器,示例代碼如下。
channel.register(selector, SelectionKey.OP_READ);
步驟十:異步讀客戶端請求消息到緩沖區,示例代碼如下。
int readBytes = channel.read(byteBuffer);
步驟十一:對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排,示例代碼如下。
Object message = null;
while (buffer.hasRemain()) {
buffer.mark();
message = decode(buffer);
if(message == null){
buffer.reset();
break;
}
}
if(!buffer.hasRemain()){
buffer.clear();
}else {
buffer.compact();
}
//業務線程處理message
步驟十二:將發生對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端,示例代碼如下。
socketChannel.write(byteBuffer);
package chanel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; public class NioClient { public static void main(String[] args) { try { NioClient client = new NioClient(); client.init(); } catch (IOException e) { e.printStackTrace(); } } // 創建一個套接字通道,註意這裏必須使用無參形式 private Selector selector = null; static Charset charset = Charset.forName("UTF-8"); private volatile boolean stop = false; public ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8); public void init() throws IOException { selector = Selector.open(); SocketChannel channel = SocketChannel.open(); // 設置為非阻塞模式,這個方法必須在實際連接之前調用(所以open的時候不能提供服務器地址,否則會自動連接) channel.configureBlocking(false); if (channel.connect(new InetSocketAddress("127.0.0.1", 7777))) { channel.register(selector, SelectionKey.OP_READ); //發送消息 doWrite(channel, "66666666"); } else { channel.register(selector, SelectionKey.OP_CONNECT); } //啟動一個接受服務器反饋的線程 // new Thread(new ReceiverInfo()).start(); while (!stop) { selector.select(1000); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); SocketChannel sc = (SocketChannel) key.channel(); // OP_CONNECT 兩種情況,鏈接成功或失敗這個方法都會返回true if (key.isConnectable()) { // 由於非阻塞模式,connect只管發起連接請求,finishConnect()方法會阻塞到鏈接結束並返回是否成功 // 另外還有一個isConnectionPending()返回的是是否處於正在連接狀態(還在三次握手中) if (channel.finishConnect()) { /* System.out.println("準備發送數據"); // 鏈接成功了可以做一些自己的處理 channel.write(charset.encode("I am Coming")); // 處理完後必須吧OP_CONNECT關註去掉,改為關註OP_READ key.interestOps(SelectionKey.OP_READ);*/ sc.register(selector, SelectionKey.OP_READ); // new Thread(new DoWrite(channel)).start(); doWrite(channel, "66666666"); } else { //鏈接失敗,進程推出或直接拋出IOException System.exit(1); } } if (key.isReadable()) { //讀取服務端的響應 ByteBuffer buffer = ByteBuffer.allocate(1024); int readBytes = sc.read(buffer); String content = ""; if (readBytes > 0) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); content += new String(bytes); stop = true; } else if (readBytes < 0) { //對端鏈路關閉 key.channel(); sc.close(); } System.out.println(content); key.interestOps(SelectionKey.OP_READ); } } } } private void doWrite(SocketChannel sc, String data) throws IOException { byte[] req = data.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(req.length); byteBuffer.put(req); byteBuffer.flip(); sc.write(byteBuffer); if (!byteBuffer.hasRemaining()) { System.out.println("Send 2 client successed"); } } }
啟動客戶端類的init()方法之後就會向服務器發送一個字符串,服務器接受到之後會向客戶端回應一個。運行如上代碼,結果正確。
通過源碼對比分析,我們發現NIO編程難度確實比同步阻塞BIO大很多,我們的NIO例程並沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼將會更加復雜。NIO代碼既然這麽復雜,為什麽它的應用卻越來越廣泛呢,使用NIO編程的優點總結如下。
(1)客戶端發起的連接操作是異步的,可以通過在多路復用器註冊OP_CONNECT等待後續結果,不需要像之前的客戶端那樣被同步阻塞。
(2)SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的數據它不會同步等待,直接返回,這樣I/O通信線程就可以處理其他的鏈路,不需要同步等待這個鏈路可用。
3)線程模型的優化:由於JDK的Selector在Linux等主流操作系統上通過epoll實現,它沒有連接句柄數的限制(只受限於操作系統的最大句柄數或者對單個進程的句柄限制),這意味著一個Selector線程可以同時處理成千上萬個客戶端連接,而且性能不會隨著客戶端的增加而線性下降,因此,它非常適合做高性能、高負載的網絡服務器。
JDK1.7升級了NIO類庫,升級後的NIO類庫被稱為NIO2.0,引人註目的是,Java正式提供了異步文件I/O操作,同時提供了與UNIX網絡編程事件驅動I/O對應的AIO,後續我們學習下如何利用NIO2.0編寫AIO程序,還是以客戶端服務器通信為例進行講解。
Nio使用Selector客戶端與服務器的通信