java網路程式設計(四)----非同步非阻塞aio及proactor模型
(aio)NIO 2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步的套接字通道時真正的非同步非阻塞I/O,對應於UNIX網路程式設計中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現非同步讀寫,從而簡化了NIO的程式設計模型。
jdk7以前的nio是非阻塞IO,作業系統底層比方說linux,是用IO複用select或者epoll實現的,也不是非同步IO啊。非同步IO在linux上目前僅限於檔案系統,並且還沒有得到廣泛應用,很多平臺都沒有這玩意。
java aio在windows上是利用iocp實現的,這是真正的非同步IO。而在linux上,是通過epoll模擬非同步的。
AIO使用
AIO 提供四種類型的非同步通道以及不同的 I/O 操作:
AsynchronousSocketChannel:connect,read,write
AsynchronousFileChannel:lock,read,write
AsynchronousServerSocketChannel:accept
AsynchronousDatagramChannel:read,write,send,receive
當然我們重點還是將網路IO(tcp)方面的使用,先關注 AsynchronousSocketChannel ,首先簡單瀏覽一下該型別的 API。
AsynchronousSocketChannel類定義
public abstract class AsynchronousSocketChannel implements AsynchronousByteChannel, NetworkChannel
建立一個非同步網路通道,並且繫結到一個預設組。
public static AsynchronousSocketChannel open() throws IOException
接著我們可以呼叫AsynchronousSocketChannel 的各種方法進行非同步操作,呼叫時候需要傳入一個CompletionHandler介面引數作為回撥介面。
1.將非同步網路通道連線到遠端伺服器,使用指定的 CompletionHandler 聽候完成通知。
public abstract <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)
2.從非同步網路通道讀取資料到指定的緩衝區,使用指定的 CompletionHandler 聽候完成通知。
public final <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler)
3.向非同步網路通道寫緩衝區中的資料,使用指定的 CompletionHandler 聽候完成通知。
public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
CompletionHandler介面,作為非同步執行之後的處理介面,正確完成時回撥completed(V result, A attachment)方法,執行失敗回撥void failed(Throwable exc, A attachment)方法。
public interface CompletionHandler<V,A> {
//Invoked when an operation has completed.
void completed(V result, A attachment);
//Invoked when an operation fails.
void failed(Throwable exc, A attachment);
}
用起來很簡單,呼叫非同步方法然後將實現的回撥介面作為引數傳遞就什麼都不用理了等程式自己執行完畢回撥。
aio還有另外一種通過Future的使用方式,因為hi出現阻塞的情況這裡就不講述這個方法了。
Proactor
Proactor模式包含如下角色:
- Handle:控制代碼;用來標識socket的連線或者讀寫操作;
- Asynchronous Operation Processor:非同步操作處理器;負責執行非同步操作,一般由作業系統核心實現;
- Asynchronous Operation:非同步操作
- Completion Event Queue:完成事件佇列;非同步操作完成的結果放到佇列中等待後續使用
- Proactor:主動器;為應用程式程序提供事件迴圈;從完成事件佇列中取出非同步操作的結果,分發呼叫相應的後續處理邏輯;
- Completion Handler:完成事件介面;一般是由回撥函式組成的介面;
- Concrete Completion Handler:完成事件處理的具體邏輯實現;實現回撥介面,定義特定的應用處理邏輯。
執行過程:
- 應用程式啟動,呼叫非同步操作處理器提供的非同步操作介面函式,呼叫之後應用程式和非同步操作處理就獨立執行;應用程式可以呼叫新的非同步操作,而其它操作可以併發進行;
- 應用程式啟動Proactor主動器,進行無限的事件迴圈,等待完成事件到來;
- 非同步操作處理器執行非同步操作,完成後將結果放入到完成事件佇列;
主動器從完成事件佇列中取出結果,分發到相應的完成事件回撥函式處理邏輯中;
我們在java aio程式中能接觸到的只有Handle和Completion Handler,使用時呼叫
AsynchronousSocketChannel:connect,read,write和
AsynchronousServerSocketChannel:accept方法作為控制代碼Handle;接著程式就會自己按照上面的流程幫我們實現非同步操作,最後在proactor再將執行結果分發到對應的Completion Handler中,回撥我們實現的completed或者failed函式。
在Proactor模式中,事件處理者(或者代由事件分發器發起)直接發起一個非同步讀寫操作(相當於請求),而實際的工作是由作業系統來完成的。發起時,需要提供的引數包括用於存放讀到資料的快取區、讀的資料大小或用於存放外發資料的快取區,以及這個請求完後的回撥函式等資訊。事件分發器得知了這個請求,它默默等待這個請求的完成,然後轉發完成事件給相應的事件處理者或者回調。舉例來說,在Windows上事件處理者投遞了一個非同步IO操作(稱為overlapped技術),事件分發器等IO Complete事件完成。這種非同步模式的典型實現是基於作業系統底層非同步API的,所以我們可稱之為“系統級別”的或者“真正意義上”的非同步,因為具體的讀寫是由作業系統代勞的。
aio程式碼實現
public class Server {
private static int DEFAULT_PORT = 12345;
private static AsyncServerHandler serverHandle;
public volatile static long clientCount;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port) {
if(null != serverHandle)
return;
serverHandle = new AsyncServerHandler(port);
/開啟伺服器執行緒
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args){
Server.start();
}
}
- 伺服器執行緒實現,建立serverSocket並非同步執行其accept方法
public class AsyncServerHandler implements Runnable {
public CountDownLatch latch;
public AsynchronousServerSocketChannel channel;
public AsyncServerHandler(int port) {
try {
//建立服務端通道
channel = AsynchronousServerSocketChannel.open();
//繫結埠
channel.bind(new InetSocketAddress(port));
System.out.println("伺服器已啟動,埠號:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//CountDownLatch初始化
latch = new CountDownLatch(1);
//用於接收客戶端的連線
//這句程式碼只能使用一次,接收一個客戶端的請求
//要實現多請求處理可以看AcceptHandler()引數的實現
channel.accept(this,new AcceptHandler());
System.out.println("主執行緒繼續執行");
try {
//讓現場在此阻塞,防止服務端執行完成後退出
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.伺服器接收到連線之後回撥實現了CompletionHandler的AcceptHandler 類。
//作為handler接收客戶端連線
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
//繼續接受其他客戶端的請求
Server.clientCount++;
System.out.println("連線的客戶端數:" + Server.clientCount);
//!!!!這行是繼續非同步接受其他請求的關鍵
//每接收一個連線之後就再執行一次非同步連線請求,這樣就能一直處理多個連線了
serverHandler.channel.accept(serverHandler, this);
//建立新的Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//非同步讀,第三個引數為接收訊息回撥的業務Handler
channel.read(buffer, buffer, new ReadHandler(channel));
}
@Override
public void failed(Throwable exc, AsyncServerHandler serverHandler) {
exc.printStackTrace();
serverHandler.latch.countDown();
}
}
3.上面的AcceptHandler非同步呼叫了讀操作,於是讀操作執行成功後呼叫ReadHandler 的completed()方法。另外寫操作對應的回撥介面作為內部類實現了。
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
//用於讀取半包訊息和傳送應答
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
//讀取到訊息後的處理
@Override
public void completed(Integer result, ByteBuffer attachment) {
//flip操作
attachment.flip();
//根據
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
try {
String expression = new String(message, "UTF-8");
System.out.println("伺服器收到訊息: " + expression);
String calrResult = "這時來自伺服器的訊息";
//向客戶端傳送訊息
doWrite(calrResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//傳送訊息
private void doWrite(String result) {
byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//非同步寫資料 引數與前面的read一樣
//將回調介面作為內部類實現了
channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果沒有傳送完,就繼續傳送直到完成
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
else{
//建立新的Buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//非同步讀 第三個引數為接收訊息回撥的業務Handler
channel.read(readBuffer, readBuffer, new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}