Java入門系列-25-NIO(實現非阻塞網絡通信)
還記得之前介紹NIO時對比傳統IO的一大特點嗎?就是NIO是非阻塞式的,這篇文章帶大家來看一下非阻塞的網絡操作。
補充:以數組的形式使用緩沖區
package testnio; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class TestBufferArray { public static void main(String[] args) throws IOException { RandomAccessFile raf1=new RandomAccessFile("D:/1.txt","rw"); //1.獲取通道 FileChannel channel1=raf1.getChannel(); //2.創建緩沖區數組 ByteBuffer buf1=ByteBuffer.allocate(512); ByteBuffer buf2=ByteBuffer.allocate(512); ByteBuffer[] bufs= {buf1,buf2}; //3.將數據讀入緩沖區數組 channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(),0,bufs[0].limit())); System.out.println("-----------"); System.out.println(new String(bufs[1].array(),0,bufs[1].limit())); //寫入緩沖區數組到通道中 RandomAccessFile raf2=new RandomAccessFile("D:/2.txt","rw"); FileChannel channel2=raf2.getChannel(); channel2.write(bufs); } }
使用NIO實現阻塞式網絡通信
TCP協議的網絡通信傳統實現方式是通過套接字編程(Socket和ServerSocket),NIO實現TCP網絡通信需要用到 Channel 接口的兩個實現類:SocketChannel和ServerSocketChannel
使用NIO實現阻塞式網絡通信
客戶端
package com.jikedaquan.blockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Client { public static void main(String[] args) { SocketChannel sChannel=null; FileChannel inChannel=null; try { //1、獲取通道 sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666)); //用於讀取文件 inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ); //2、分配指定大小的緩沖區 ByteBuffer buf=ByteBuffer.allocate(1024); //3、讀取本地文件,發送到服務器端 while(inChannel.read(buf)!=-1) { buf.flip(); sChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //關閉通道 if (inChannel!=null) { try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
new InetSocketAddress("127.0.0.1", 1666) 用於向客戶端套接字通道(SocketChannel)綁定要連接地址和端口
服務端
package com.jikedaquan.blockingnio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class Server { public static void main(String[] args) { ServerSocketChannel ssChannel=null; FileChannel outChannel=null; SocketChannel sChannel=null; try { //1、獲取通道 ssChannel = ServerSocketChannel.open(); //用於保存文件的通道 outChannel = FileChannel.open(Paths.get("F:/b.jpg"), StandardOpenOption.WRITE,StandardOpenOption.CREATE); //2、綁定要監聽的端口號 ssChannel.bind(new InetSocketAddress(1666)); //3、獲取客戶端連接的通道 sChannel = ssChannel.accept(); //4、分配指定大小的緩沖區 ByteBuffer buf=ByteBuffer.allocate(1024); //5、接收客戶端的數據,並保存到本地 while(sChannel.read(buf)!=-1) { buf.flip(); outChannel.write(buf); buf.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { //6、關閉通道 if(sChannel!=null) { try { sChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(outChannel!=null) { try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(ssChannel!=null) { try { ssChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服務端套接字僅綁定要監聽的端口即可
ssChannel.bind(new InetSocketAddress(1666));
上面的代碼使用NIO實現的網絡通信,可能有同學會問,沒有看到阻塞效果啊,確實是阻塞式的看不到效果,因為客戶端發送一次數據就結束了,服務端也是接收一次數據就結束了。那如果服務端接收完成數據後,再向客戶端反饋呢?
能夠看到阻塞效果的網絡通信
客戶端
package com.jikedaquan.blockingnio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class Client {
public static void main(String[] args) {
SocketChannel sChannel=null;
FileChannel inChannel=null;
try {
sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1666));
inChannel = FileChannel.open(Paths.get("F:/a.jpg"), StandardOpenOption.READ);
ByteBuffer buf=ByteBuffer.allocate(1024);
while(inChannel.read(buf)!=-1) {
buf.flip();
sChannel.write(buf);
buf.clear();
}
//sChannel.shutdownOutput();//去掉註釋掉將不會阻塞
//接收服務器端的反饋
int len=0;
while((len=sChannel.read(buf))!=-1) {
buf.flip();
System.out.println(new String(buf.array(),0,len));
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(inChannel!=null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(sChannel!=null) {
try {
sChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
服務端
package com.jikedaquan.blockingnio2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class Server {
public static void main(String[] args) {
ServerSocketChannel ssChannel=null;
FileChannel outChannel=null;
SocketChannel sChannel=null;
try {
ssChannel = ServerSocketChannel.open();
outChannel = FileChannel.open(Paths.get("F:/a.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
ssChannel.bind(new InetSocketAddress(1666));
sChannel = ssChannel.accept();
ByteBuffer buf=ByteBuffer.allocate(1024);
while(sChannel.read(buf)!=-1) {
buf.flip();
outChannel.write(buf);
buf.clear();
}
//發送反饋給客戶端
buf.put("服務端接收數據成功".getBytes());
buf.flip();
sChannel.write(buf);
} catch (IOException e) {
e.printStackTrace();
}finally {
if(sChannel!=null) {
try {
sChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(outChannel!=null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(ssChannel!=null) {
try {
ssChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
服務端將向客戶端發送兩次數據
選擇器(Selector)
想要實現非阻塞的IO,必須要先弄懂選擇器。Selector 抽象類,可通過調用此類的 open 方法創建選擇器,該方法將使用系統的默認選擇器提供者創建新的選擇器。
將通道設置為非阻塞之後,需要將通道註冊到選擇器中,註冊的同時需要指定一個選擇鍵的類型 (SelectionKey)。
選擇鍵(SelectionKey)可以認為是一種標記,標記通道的類型和狀態。
SelectionKey的靜態字段:
OP_ACCEPT:用於套接字接受操作的操作集位
OP_CONNECT:用於套接字連接操作的操作集位
OP_READ:用於讀取操作的操作集位
OP_WRITE:用於寫入操作的操作集位
用於檢測通道狀態的方法:
方法名稱 | 說明 |
---|---|
isAcceptable() | 測試此鍵的通道是否已準備好接受新的套接字連接 |
isConnectable() | 測試此鍵的通道是否已完成其套接字連接操作 |
isReadable() | 測試此鍵的通道是否已準備好進行讀取 |
isWritable() | 測試此鍵的通道是否已準備好進行寫入 |
將通道註冊到選擇器:
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
IO操作準備就緒的通道大於0,輪詢選擇器
while(selector.select()>0) {
//獲取選擇鍵,根據不同的狀態做不同的操作
}
實現非阻塞式TCP協議網絡通信
非阻塞模式:channel.configureBlocking(false);
客戶端
package com.jikedaquan.nonblockingnio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
SocketChannel sChannel=null;
try {
//1、獲取通道
sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",1666));
//2、切換非阻塞模式
sChannel.configureBlocking(false);
//3、分配指定大小的緩沖區
ByteBuffer buf=ByteBuffer.allocate(1024);
//4、發送數據給服務端
Scanner scanner=new Scanner(System.in);
//循環從控制臺錄入數據發送給服務端
while(scanner.hasNext()) {
String str=scanner.next();
buf.put((new Date().toString()+"\n"+str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
//5、關閉通道
if(sChannel!=null) {
try {
sChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
服務端
package com.jikedaquan.nonblockingnio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server {
public static void main(String[] args) throws IOException {
//1、獲取通道
ServerSocketChannel ssChannel=ServerSocketChannel.open();
//2、切換非阻塞模式
ssChannel.configureBlocking(false);
//3、綁定監聽的端口號
ssChannel.bind(new InetSocketAddress(1666));
//4、獲取選擇器
Selector selector=Selector.open();
//5、將通道註冊到選擇器上,並指定“監聽接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6、輪詢式的獲取選擇器上已經 “準備就緒”的事件
while(selector.select()>0) {
//7、獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)”
Iterator<SelectionKey> it=selector.selectedKeys().iterator();
while(it.hasNext()) {
//8、獲取準備就緒的事件
SelectionKey sk=it.next();
//9、判斷具體是什麽事件準備就緒
if(sk.isAcceptable()) {
//10、若“接收就緒”,獲取客戶端連接
SocketChannel sChannel=ssChannel.accept();
//11、切換非阻塞模式
sChannel.configureBlocking(false);
//12、將該通道註冊到選擇器上
sChannel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()) {
//13、獲取當前選擇器上“讀就緒”狀態的通道
SocketChannel sChannel=(SocketChannel)sk.channel();
//14、讀取數據
ByteBuffer buf=ByteBuffer.allocate(1024);
int len=0;
while((len=sChannel.read(buf))>0) {
buf.flip();
System.out.println(new String(buf.array(),0,len));
buf.clear();
}
}
//15、取消選擇鍵 SelectionKey
it.remove();
}
}
}
}
服務端接收客戶端的操作需要在判斷 isAcceptable() 方法內將就緒的套接字通道以讀操作註冊到 選擇器中
在判斷 isReadable() 內從通道中獲取數據
實現非阻塞式UDP協議網絡通信
發送端
package com.jikedaquan.nonblockingnio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Scanner;
public class TestDatagramSend {
public static void main(String[] args) throws IOException {
//獲取通道
DatagramChannel dChannel=DatagramChannel.open();
//非阻塞
dChannel.configureBlocking(false);
ByteBuffer buf=ByteBuffer.allocate(1024);
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()) {
String str=scanner.next();
buf.put(str.getBytes());
buf.flip();
//發送數據到目標地址和端口
dChannel.send(buf,new InetSocketAddress("127.0.0.1", 1666));
buf.clear();
}
dChannel.close();
}
}
接收端
package com.jikedaquan.nonblockingnio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
public class TestDatagramReceive {
public static void main(String[] args) throws IOException {
//獲取通道
DatagramChannel dChannel=DatagramChannel.open();
dChannel.configureBlocking(false);
//綁定監聽端口
dChannel.bind(new InetSocketAddress(1666));
//獲取選擇器
Selector selector=Selector.open();
//讀操作註冊通道
dChannel.register(selector, SelectionKey.OP_READ);
while(selector.select()>0) {
Iterator<SelectionKey> it=selector.selectedKeys().iterator();
//叠代選擇鍵
while(it.hasNext()) {
SelectionKey sk=it.next();
//通道可讀
if(sk.isReadable()) {
ByteBuffer buf=ByteBuffer.allocate(1024);
//接收數據存入緩沖區
dChannel.receive(buf);
buf.flip();
System.out.println(new String(buf.array(),0,buf.limit()));
buf.clear();
}
}
it.remove();
}
}
}
Java入門系列-25-NIO(實現非阻塞網絡通信)