Java入門系列-25-NIO(實現非阻塞網路通訊)
還記得之前介紹IO/">NIO時對比傳統IO的一大特點嗎?就是NIO是非阻塞式的,這篇文章帶大家來看一下非阻塞的網路操作。
補充:以陣列的形式使用緩衝區
package testnio; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class TestBufferArray { public static void main(String[] args) throws IOException { RandomAccessFile raf1=new RandomAccessFile("D:/1.txt","rw"); //1.獲取通道 FileChannel channel1=raf1.getChannel(); //2.建立緩衝區陣列 ByteBuffer buf1=ByteBuffer.allocate(512); ByteBuffer buf2=ByteBuffer.allocate(512); ByteBuffer[] bufs= {buf1,buf2}; //3.將資料讀入緩衝區陣列 channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(),0,bufs[0].limit())); System.out.println("-----------"); System.out.println(new String(bufs[1].array(),0,bufs[1].limit())); //寫入緩衝區陣列到通道中 RandomAccessFile raf2=new RandomAccessFile("D:/2.txt","rw"); FileChannel channel2=raf2.getChannel(); channel2.write(bufs); } }
使用NIO實現阻塞式網路通訊
TCP協議的網路通訊傳統實現方式是通過套接字程式設計(Socket和ServerSocket),NIO實現TCP網路通訊需要用到 Channel 介面的兩個實現類:SocketChannel和ServerSocketChannel
使用NIO實現阻塞式網路通訊
客戶端
package com.jikedaquan.blockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; FileChannel inChannel=null; try { //1、獲取通道 sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666)); //用於讀取檔案 inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ); //2、分配指定大小的緩衝區 ByteBuffer buf=ByteBuffer.allocate(1024); //3、讀取本地檔案,傳送到伺服器端 while(inChannel.read(buf)!=-1) { buf.flip(); sChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //關閉通道 if (inChannel!=null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
new InetSocketAddress("127.0.0.1", 1666) 用於向客戶端套接字通道(SocketChannel)繫結要連線地址和埠
服務端
package com.jikedaquan.blockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Server { public static void main(String[] args) { ServerSocketChannel ssChannel=null; FileChannel outChannel=null; SocketChannel sChannel=null; try { //1、獲取通道 ssChannel = ServerSocketChannel.open(); //用於儲存檔案的通道 outChannel = FileChannel.open(Paths.get("F:/b.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE); //2、繫結要監聽的埠號 ssChannel.bind(new InetSocketAddress(1666)); //3、獲取客戶端連線的通道 sChannel = ssChannel.accept(); //4、分配指定大小的緩衝區 ByteBuffer buf=ByteBuffer.allocate(1024); //5、接收客戶端的資料,並儲存到本地 while(sChannel.read(buf)!=-1) { buf.flip(); outChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //6、關閉通道 if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel!=null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(ssChannel!=null) { try { ssChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服務端套接字僅繫結要監聽的埠即可ssChannel.bind(new InetSocketAddress(1666));
上面的程式碼使用NIO實現的網路通訊,可能有同學會問,沒有看到阻塞效果啊,確實是阻塞式的看不到效果,因為客戶端傳送一次資料就結束了,服務端也是接收一次資料就結束了。那如果服務端接收完成資料後,再向客戶端反饋呢?
能夠看到阻塞效果的網路通訊
客戶端
package com.jikedaquan.blockingnio2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; FileChannel inChannel=null; try { sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666)); inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ); ByteBuffer buf=ByteBuffer.allocate(1024); while(inChannel.read(buf)!=-1) { buf.flip(); sChannel.write(buf); buf.clear(); } //sChannel.shutdownOutput();//去掉註釋掉將不會阻塞 //接收伺服器端的反饋 int len=0; while((len=sChannel.read(buf))!=-1) { buf.flip(); System.out.println(new String(buf.array(),0,len)); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { if(inChannel!=null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服務端
package com.jikedaquan.blockingnio2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Server { public static void main(String[] args) { ServerSocketChannel ssChannel=null; FileChannel outChannel=null; SocketChannel sChannel=null; try { ssChannel = ServerSocketChannel.open(); outChannel = FileChannel.open(Paths.get("F:/a.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE); ssChannel.bind(new InetSocketAddress(1666)); sChannel = ssChannel.accept(); ByteBuffer buf=ByteBuffer.allocate(1024); while(sChannel.read(buf)!=-1) { buf.flip(); outChannel.write(buf); buf.clear(); } //傳送反饋給客戶端 buf.put("服務端接收資料成功".getBytes()); buf.flip(); sChannel.write(buf); } catch (IOException e) { e.printStackTrace(); }finally { if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel!=null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(ssChannel!=null) { try { ssChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服務端將向客戶端傳送兩次資料
選擇器(Selector)
想要實現非阻塞的IO,必須要先弄懂選擇器。Selector 抽象類,可通過呼叫此類的 open 方法建立選擇器,該方法將使用系統的預設選擇器提供者建立新的選擇器。
將通道設定為非阻塞之後,需要將通道註冊到選擇器中,註冊的同時需要指定一個選擇鍵的型別 (SelectionKey)。
選擇鍵(SelectionKey)可以認為是一種標記,標記通道的型別和狀態。
SelectionKey的靜態欄位:
OP_ACCEPT:用於套接字接受操作的操作集位
OP_CONNECT:用於套接字連線操作的操作集位
OP_READ:用於讀取操作的操作集位
OP_WRITE:用於寫入操作的操作集位
用於檢測通道狀態的方法:
方法名稱 | 說明 |
---|---|
isAcceptable() | 測試此鍵的通道是否已準備好接受新的套接字連線 |
isConnectable() | 測試此鍵的通道是否已完成其套接字連線操作 |
isReadable() | 測試此鍵的通道是否已準備好進行讀取 |
isWritable() | 測試此鍵的通道是否已準備好進行寫入 |
將通道註冊到選擇器:
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
IO操作準備就緒的通道大於0,輪詢選擇器
while(selector.select()>0) { //獲取選擇鍵,根據不同的狀態做不同的操作 }
實現非阻塞式TCP協議網路通訊
非阻塞模式:channel.configureBlocking(false);
客戶端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Scanner; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; try { //1、獲取通道 sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",1666)); //2、切換非阻塞模式 sChannel.configureBlocking(false); //3、分配指定大小的緩衝區 ByteBuffer buf=ByteBuffer.allocate(1024); //4、傳送資料給服務端 Scanner scanner=new Scanner(System.in); //迴圈從控制檯錄入資料傳送給服務端 while(scanner.hasNext()) { String str=scanner.next(); buf.put((new Date().toString()+"\n"+str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //5、關閉通道 if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服務端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class Server { public static void main(String[] args) throws IOException { //1、獲取通道 ServerSocketChannel ssChannel=ServerSocketChannel.open(); //2、切換非阻塞模式 ssChannel.configureBlocking(false); //3、繫結監聽的埠號 ssChannel.bind(new InetSocketAddress(1666)); //4、獲取選擇器 Selector selector=Selector.open(); //5、將通道註冊到選擇器上,並指定“監聽接收事件” ssChannel.register(selector, SelectionKey.OP_ACCEPT); //6、輪詢式的獲取選擇器上已經 “準備就緒”的事件 while(selector.select()>0) { //7、獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)” Iterator<SelectionKey> it=selector.selectedKeys().iterator(); while(it.hasNext()) { //8、獲取準備就緒的事件 SelectionKey sk=it.next(); //9、判斷具體是什麼事件準備就緒 if(sk.isAcceptable()) { //10、若“接收就緒”,獲取客戶端連線 SocketChannel sChannel=ssChannel.accept(); //11、切換非阻塞模式 sChannel.configureBlocking(false); //12、將該通道註冊到選擇器上 sChannel.register(selector, SelectionKey.OP_READ); }else if(sk.isReadable()) { //13、獲取當前選擇器上“讀就緒”狀態的通道 SocketChannel sChannel=(SocketChannel)sk.channel(); //14、讀取資料 ByteBuffer buf=ByteBuffer.allocate(1024); int len=0; while((len=sChannel.read(buf))>0) { buf.flip(); System.out.println(new String(buf.array(),0,len)); buf.clear(); } } //15、取消選擇鍵 SelectionKey it.remove(); } } } }
服務端接收客戶端的操作需要在判斷 isAcceptable() 方法內將就緒的套接字通道以讀操作註冊到 選擇器中
在判斷 isReadable() 內從通道中獲取資料
實現非阻塞式UDP協議網路通訊
傳送端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.Scanner; public class TestDatagramSend { public static void main(String[] args) throws IOException { //獲取通道 DatagramChannel dChannel=DatagramChannel.open(); //非阻塞 dChannel.configureBlocking(false); ByteBuffer buf=ByteBuffer.allocate(1024); Scanner scanner=new Scanner(System.in); while(scanner.hasNext()) { String str=scanner.next(); buf.put(str.getBytes()); buf.flip(); //傳送資料到目標地址和埠 dChannel.send(buf,new InetSocketAddress("127.0.0.1", 1666)); buf.clear(); } dChannel.close(); } }
接收端
package com.jikedaquan.nonblockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; public class TestDatagramReceive { public static void main(String[] args) throws IOException { //獲取通道 DatagramChannel dChannel=DatagramChannel.open(); dChannel.configureBlocking(false); //繫結監聽埠 dChannel.bind(new InetSocketAddress(1666)); //獲取選擇器 Selector selector=Selector.open(); //讀操作註冊通道 dChannel.register(selector, SelectionKey.OP_READ); while(selector.select()>0) { Iterator<SelectionKey> it=selector.selectedKeys().iterator(); //迭代選擇鍵 while(it.hasNext()) { SelectionKey sk=it.next(); //通道可讀 if(sk.isReadable()) { ByteBuffer buf=ByteBuffer.allocate(1024); //接收資料存入緩衝區 dChannel.receive(buf); buf.flip(); System.out.println(new String(buf.array(),0,buf.limit())); buf.clear(); } } it.remove(); } } }