Java NIO學習筆記(四) 使用JDK 1.7 NIO2.0 實現客戶端與伺服器的通訊
JDK1.7 提供了全新的非同步NIO模式。稱為:NIO2.0或AIO。該模式引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供兩種方式獲取獲取操作結果。分別是:
- 通過java.util.concurrent.Future類來表示非同步操作的結果;
- CompletionHandler介面的實現類作為操作完成的回撥。
NIO2.0的非同步套接字通道是真正的非同步非阻塞I/O,它對應UNIX網路程式設計中的事件驅動I/O(AIO),它不需要通過多路複用器(Selector)對註冊的通道進行輪詢操作即可實現非同步讀寫,從而簡化了NIO的程式設計模型。
回撥模式
AIO的回撥模式的服務端程式碼:
public class SocketServiceCb {
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
CountDownLatch latch;
public void start() {
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress("127.0.0.1", 17777));
} catch (IOException e) {
e.printStackTrace();
}
latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doAccept() {
asynchronousServerSocketChannel.accept(this, new AccessCompleteHandler());
}
class AccessCompleteHandler implements CompletionHandler<AsynchronousSocketChannel, SocketServiceCb> {
@Override
public void completed(AsynchronousSocketChannel result, SocketServiceCb attachment) {
// 當我們呼叫AsynchronousServerSocketChannel的accept方法後,如果有新的客戶端連線接入,
// 系統將回調我們傳入的CompletionHandler例項的completed方法,表示新的客戶端已經接入成功,
// 因為一個AsynchronousServerSocket Channel可以接收成千上萬個客戶端,
// 所以我們需要繼續呼叫它的accept方法,接收其他的客戶端連線,
// 最終形成一個迴圈。每當接收一個客戶讀連線成功之後,再非同步接收新的客戶端連線。
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.flip();
//引數1 ByteBuffer dst:接收緩衝區,用於從非同步Channel中讀取資料包;
//引數2 A attachment:非同步Channel攜帶的附件,通知回撥的時候作為入參使用。即回撥方法的第二個引數
//引數3 CompletionHandler<Integer,? super A>:接收通知回撥的業務handler,本例程中為ReadCompletionHandler。
result.read(byteBuffer, byteBuffer, new ReadCompleteHandler(result));
}
@Override
public void failed(Throwable exc, SocketServiceCb attachment) {
}
}
class ReadCompleteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompleteHandler(AsynchronousSocketChannel result) {
this.channel = result;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//flip操作,為後續從緩衝區讀取資料做準備
System.out.print("資料大小為:"+attachment.remaining());
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
try {
System.out.print("伺服器接收:" + new String(bytes, "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
doWrite("12312");
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doWrite(String data) {
ByteBuffer byteBuffer = ByteBuffer.wrap(data.getBytes());
byteBuffer.flip();
channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
}else {
latch.countDown();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
}
CompletionHandler介面即為回撥,它有兩個方法,執行成功的回撥和異常回調,分別如下。
- public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment);
- public void failed(Throwable exc, AsyncTimeServerHandler attachment)。
AIO的回撥模式的客戶端程式碼:
CountDownLatch latch;
@Test
public void testNIOCallBack() throws IOException {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
latch = new CountDownLatch(1);
channel.connect(new InetSocketAddress("127.0.0.1", 17777),null, new CompletionHandler() {
@Override
public void completed(Object result, Object attachment) {
System.out.print("連結成功");
//final ByteBuffer byteBuffer = ByteBuffer.wrap("12312414".getBytes());
byte[] bytes = "1231231231".getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(bytes);
byteBuffer.flip();
channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(attachment.hasRemaining()){
channel.write(attachment, attachment, this);
}else {
System.out.print("傳送成功");
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
System.out.print("接受資訊:"+new String(bytes));
latch.countDown();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
JDK底層通過執行緒池ThreadPoolExecutor來執行回撥通知,非同步回撥通知類由sun.nio.ch.AsynchronousChannelGroupImpl實現,它經過層層呼叫,最終回撥com.phei.netty.aio.AsyncTimeClientHandler$1.completed方法,完成回撥通知。由此我們也可以得出結論:非同步Socket Channel是被動執行物件,我們不需要像NIO程式設計那樣建立一個獨立的I/O執行緒來處理讀寫操作。對於AsynchronousServerSocket Channel和AsynchronousSocketChannel,它們都由JDK底層的執行緒池負責回撥並驅動讀寫操作。
Future模式
Future模式伺服器端的程式碼
public class SocketServiceAIO {
private static ExecutorService executorService;
private static AsynchronousServerSocketChannel serverSocketChannel;
static class ChannelWorker implements Callable<String> {
private CharBuffer charBuffer;
private CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
private AsynchronousSocketChannel channel;
ChannelWorker(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public String call() throws Exception {
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//讀取請求
while (channel.read(byteBuffer).get() != -1) {
byteBuffer.flip();
charBuffer = decoder.decode(byteBuffer);
String request = charBuffer.toString().trim();
System.out.println("客戶端請求:" + request);
ByteBuffer outByteBuffer = ByteBuffer.wrap("請求收到".getBytes());
Future future = channel.write(outByteBuffer);
future.get();
if (byteBuffer.hasRemaining()) {
byteBuffer.compact();
} else {
byteBuffer.clear();
}
}
channel.close();
return "OK";
}
}
private static void init() throws IOException {
executorService = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
serverSocketChannel = AsynchronousServerSocketChannel.open();
if (serverSocketChannel.isOpen()) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 17777));
} else {
throw new RuntimeException("通道未開啟");
}
}
private static void start() {
System.out.println("等待客戶端請求...");
while (true) {
//接收客戶端請求
Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();
try {
//獲取請求
AsynchronousSocketChannel channel = future.get();
//提交給執行緒池
executorService.submit(new ChannelWorker(channel));
} catch (Exception ex) {
ex.printStackTrace();
System.err.println("伺服器關閉");
executorService.shutdown();
while (!executorService.isTerminated()) {
}
break;
}
}
}
public static void startTCPService() {
try {
init();
} catch (IOException e) {
e.printStackTrace();
System.out.println("AIO初始化失敗");
}
try {
start();
} catch (Exception e) {
e.printStackTrace();
System.out.println("AIO初始化失敗");
}
}
}
Future客戶端程式碼:
@Test
public void test() {
try {
start();
} catch (Exception e) {
e.printStackTrace();
System.out.println("AIO初始化失敗");
}
}
private void start() throws IOException, ExecutionException, InterruptedException {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
if (socketChannel.isOpen()) {
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
Future future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 17777));
Object connect = future.get();
if(connect != null){
throw new RuntimeException("連結失敗");
}
} else {
throw new RuntimeException("通道未開啟");
}
//傳送資料
Future future = socketChannel.write(ByteBuffer.wrap("我是客戶端".getBytes()));
future.get();
//讀取伺服器的傳送的資料
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//讀取伺服器
while(socketChannel.read(byteBuffer).get() != -1){
byteBuffer.flip();
CharBuffer charBuffer = Charset.defaultCharset().newDecoder().decode(byteBuffer);
System.out.println("伺服器說:"+charBuffer.toString().trim());
byteBuffer.clear();
}
}
至此,我們學習了在java中幾種不同的網路程式設計模式。這裡總是一下:
1.非同步非阻塞I/O
即本節所述的JDK 1.7 AIO。很多人喜歡將JDK1.4提供的NIO框架稱為非同步非阻塞I/O,但是,如果嚴格按照UNIX網路程式設計模型和JDK的實現進行區分,實際上它只能被稱為非阻塞I/O,不能叫非同步非阻塞I/O。因為它是基於Selctor的select/poll模型實現,它是基於I/O複用技術的非阻塞I/O,不是非同步I/O。在JDK1.5 update10和Linux core2.6以上版本,Sun優化了Selctor的實現,它在底層使用epoll替換了select/poll,上層的API並沒有變化,可以認為是JDK NIO的一次效能優化,但是它仍舊沒有改變I/O的模型。
由JDK1.7提供的NIO2.0,新增了非同步的套接字通道,它是真正的非同步I/O,在非同步I/O操作的時候可以傳遞訊號變數,當操作完成之後會回撥相關的方法,非同步I/O也被稱為AIO。
2.多路複用器/選擇器 Selector
在前面的章節我們介紹過Java NIO的實現關鍵是多路複用I/O技術,多路複用的核心就是通過Selector來輪詢註冊在其上的Channel,當發現某個或者多個Channel處於就緒狀態後,從阻塞狀態返回就緒的Channel的選擇鍵集合,進行I/O操作。由於多路複用器是NIO實現非阻塞I/O的關鍵
3.偽非同步I/O
偽非同步I/O的概念完全來源於實踐。在JDK NIO程式設計沒有流行之前,為了解決Tomcat通訊執行緒同步I/O導致業務執行緒被掛住的問題,大家想到了一個辦法:在通訊執行緒和業務執行緒之間做個緩衝區,這個緩衝區用於隔離I/O執行緒和業務執行緒間的直接訪問,這樣業務執行緒就不會被I/O執行緒阻塞。而對於後端的業務側來說,將訊息或者Task放到執行緒池後就返回了,它不再直接訪問I/O執行緒或者進行I/O讀寫,這樣也就不會被同步阻塞。像這樣通過執行緒池做緩衝區的做法來解決一連線一執行緒問題,習慣於稱它為偽非同步I/O,而官方並沒有偽非同步I/O這種說法,請大家注意。
4.同步阻塞IO
即最簡單,也最好理解,一個Socket連結,開啟一個執行緒去處理。
如何選擇
如果客戶端併發連線數不多,伺服器的負載也不重,那就完全沒必要選擇NIO做服務端,畢竟非阻塞的處理IO是比阻塞IO的響應時間要慢一些(涉及多執行緒的上下文切換等問題);如果是相反情況,那就要考慮選擇合適的NIO框架進行開發。但並建議直接利用JDK的NIO來開發。
開發出高質量的NIO程式並不是一件簡單的事情,除去NIO固有的複雜性和BUG不談,作為一個NIO服務端,需要能夠處理網路的閃斷、客戶端的重複接入、客戶端的安全認證、訊息的編解碼、半包讀寫、網路擁塞等情況。由於NIO還涉及到Reactor模式,如果你沒有足夠的NIO網路程式設計和多執行緒程式設計經驗積累,一個NIO框架的穩定往往需要半年甚至更長的時間。更為糟糕的是,一旦在生產環境中發生問題,往往會導致跨節點的服務呼叫中斷,嚴重的可能會導致整個叢集環境都不可用,需要重啟伺服器,這種非正常停機會帶來巨大的損失。
從可維護性角度看,由於NIO採用了非同步非阻塞程式設計模型,而且是一個I/O執行緒處理多條鏈路,它的除錯和跟蹤非常麻煩,特別是生產環境中的問題,我們無法進行有效的除錯和跟蹤,往往只能靠一些日誌來輔助分析,定位難度很大。
由於上述原因,在大多數場景下,不建議大家直接使用JDK的NIO類庫,除非你精通NIO程式設計或者有特殊的需求。在絕大多數的業務場景中,我們可以使用NIO框架Netty來進行NIO程式設計,它既可以作為客戶端也可以作為服務端,同時支援UDP和非同步檔案傳輸,功能非常強大。
後續筆者將記錄Netty的學習筆記