JAVA NIO2模式之Proactor( JDK7 AIO非同步網路IO初探)
Java nio 2.0的主要改進就是引入了非同步IO(包括檔案和網路),這裡主要介紹下非同步網路IO API的使用以及框架的設計,以TCP服務端為例。首先看下為了支援AIO引入的新的類和介面:
java.nio.channels.AsynchronousChannel
標記一個channel支援非同步IO操作。
java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,建立TCP服務端,繫結地址,監聽埠等。
java.nio.channels.AsynchronousSocketChannel
面向流的非同步socket channel,表示一個連線。
java.nio.channels.AsynchronousChannelGroup
非同步channel的分組管理,目的是為了資源共享。一個AsynchronousChannelGroup繫結一個執行緒池,這個執行緒池執行兩個任務:處理IO事件和派發CompletionHandler。AsynchronousServerSocketChannel建立的時候可以傳入一個 AsynchronousChannelGroup,那麼通過AsynchronousServerSocketChannel建立的 AsynchronousSocketChannel將同屬於一個組,共享資源。
java.nio.channels.CompletionHandler
非同步IO操作結果的回撥介面,用於定義在IO操作完成後所作的回撥工作。AIO的API允許兩種方式來處理非同步操作的結果:返回的Future模式或者註冊CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的呼叫是由 AsynchronousChannelGroup的執行緒池派發的。顯然,執行緒池的大小是效能的關鍵因素。AsynchronousChannelGroup允許繫結不同的執行緒池,通過三個靜態方法來建立: Java程式碼
- publicstatic AsynchronousChannelGroup withFixedThreadPool(
- ThreadFactory threadFactory)
- throws IOException
- publicstatic AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,
- int initialSize)
- publicstatic AsynchronousChannelGroup withThreadPool(ExecutorService executor)
- throws IOException
需要根據具體應用相應調整,從框架角度出發,需要暴露這樣的配置選項給使用者。
在介紹完了aio引入的TCP的主要介面和類之後,我們來設想下一個aio框架應該怎麼設計。參考非阻塞nio框架的設計,一般都是採用Reactor模式,Reacot負責事件的註冊、select、事件的派發;相應地,非同步IO有個Proactor模式,Proactor負責 CompletionHandler的派發,檢視一個典型的IO寫操作的流程來看兩者的區別:
Reactor: send(msg) -> 訊息佇列是否為空,如果為空 -> 向Reactor註冊OP_WRITE,然後返回 -> Reactor select -> 觸發Writable,通知使用者執行緒去處理 ->先登出Writable(很多人遇到的cpu 100%的問題就在於沒有登出),處理Writeable,如果沒有完全寫入,繼續註冊OP_WRITE。注意到,寫入的工作還是使用者執行緒在處理。
Proactor: send(msg) -> 訊息佇列是否為空,如果為空,發起read非同步呼叫,並註冊CompletionHandler,然後返回。 -> 作業系統負責將你的訊息寫入,並返回結果(寫入的位元組數)給Proactor -> Proactor派發CompletionHandler。可見,寫入的工作是作業系統在處理,無需使用者執行緒參與。事實上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三個方法,分別對應於處理成功、失敗、被取消(通過返回的Future)情況下的回撥處理:
- publicinterface CompletionHandler<V,A> {
- void completed(V result, A attachment);
- void failed(Throwable exc, A attachment);
- void cancelled(A attachment);
- }
其中的泛型引數V表示IO呼叫的結果,而A是發起呼叫時傳入的attchment。
在初步介紹完aio引入的類和介面後,我們看看一個典型的tcp服務端是怎麼啟動的,怎麼接受連線並處理讀和寫,這裡引用的程式碼都是yanf4j 的aio分支中的程式碼,可以從svn checkout,svn地址:http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
第一步,建立一個AsynchronousServerSocketChannel,建立之前先建立一個 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以繫結一個 AsynchronousChannelGroup,那麼通過這個AsynchronousServerSocketChannel建立的連線都將同屬於一個AsynchronousChannelGroup並共享資源:
- this.asynchronousChannelGroup = AsynchronousChannelGroup
- .withCachedThreadPool(Executors.newCachedThreadPool(),
- this.threadPoolSize);
然後初始化一個AsynchronousServerSocketChannel,通過open方法:
Java程式碼- this.serverSocketChannel = AsynchronousServerSocketChannel
- .open(this.asynchronousChannelGroup);
通過nio 2.0引入的SocketOption類設定一些TCP選項:
Java程式碼- this.serverSocketChannel
- .setOption(
- StandardSocketOption.SO_REUSEADDR,true);
- this.serverSocketChannel
- .setOption(
- StandardSocketOption.SO_RCVBUF,16*1024);
繫結本地地址:
- this.serverSocketChannel
- .bind(new InetSocketAddress("localhost",8080), 100);
其中的100用於指定等待連線的佇列大小(backlog)。完了嗎?還沒有,最重要的監聽工作還沒開始,監聽埠是為了等待連線上來以便accept產生一個AsynchronousSocketChannel來表示一個新建立的連線,因此需要發起一個accept呼叫,呼叫是非同步的,作業系統將在連線建立後,將最後的結果——AsynchronousSocketChannel返回給你:
- publicvoid pendingAccept() {
- if (this.started && this.serverSocketChannel.isOpen()) {
- this.acceptFuture = this.serverSocketChannel.accept(null,
- new AcceptCompletionHandler());
- } else {
- thrownew IllegalStateException("Controller has been closed");
- }
- }
注意,重複的accept呼叫將會丟擲PendingAcceptException,後文提到的read和write也是如此。accept方法的第一個引數是你想傳給CompletionHandler的attchment,第二個引數就是註冊的用於回撥的CompletionHandler,最後返回結果Future<AsynchronousSocketChannel>。你可以對future做處理,這裡採用更推薦的方式就是註冊一個CompletionHandler。那麼accept的CompletionHandler中做些什麼工作呢?顯然一個赤裸裸的
AsynchronousSocketChannel是不夠的,我們需要將它封裝成session,一個session表示一個連線(mina裡就叫 IoSession了),裡面帶了一個緩衝的訊息佇列以及一些其他資源等。在連線建立後,除非你的伺服器只准備接受一個連線,不然你需要在後面繼續呼叫pendingAccept來發起另一個accept請求:
- privatefinalclass AcceptCompletionHandler implements
- CompletionHandler<AsynchronousSocketChannel, Object> {
- @Override
- publicvoid cancelled(Object attachment) {
- logger.warn("Accept operation was canceled");
- }
- @Override
- publicvoid completed(AsynchronousSocketChannel socketChannel,
- Object attachment) {
- try {
- logger.debug("Accept connection from "
- + socketChannel.getRemoteAddress());
- configureChannel(socketChannel);
- AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);
- Session session = new AioTCPSession(sessionConfig,
- AioTCPController.this.configuration
- .getSessionReadBufferSize(),
- AioTCPController.this.sessionTimeout);
- session.start();
- registerSession(session);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Accept error", e);
- notifyException(e);
- } finally {
- <strong>pendingAccept</strong>();
- }
- }
- @Override
- publicvoid failed(Throwable exc, Object attachment) {
- logger.error("Accept error", exc);
- try {
- notifyException(exc);
- } finally {
- <strong>pendingAccept</strong>();
- }
- }
- }
注意到了吧,我們在failed和completed方法中在最後都呼叫了pendingAccept來繼續發起accept呼叫,等待新的連線上來。有的同學可能要說了,這樣搞是不是遞迴呼叫,會不會堆疊溢位?實際上不會,因為發起accept呼叫的執行緒與CompletionHandler回撥的執行緒並非同一個,不是一個上下文中,兩者之間沒有耦合關係。要注意到,CompletionHandler的回撥共用的是 AsynchronousChannelGroup繫結的執行緒池,因此千萬別在CompletionHandler回撥方法中呼叫阻塞或者長時間的操作,例如sleep,回撥方法最好能支援超時,防止執行緒池耗盡。
連線建立後,怎麼讀和寫呢?回憶下在nonblocking nio框架中,連線建立後的第一件事是幹什麼?註冊OP_READ事件等待socket可讀。非同步IO也同樣如此,連線建立後馬上發起一個非同步read呼叫,等待socket可讀,這個是Session.start方法中所做的事情:
- publicclass AioTCPSession {
- protectedvoid start0() {
- pendingRead();
- }
- protectedfinalvoid pendingRead() {
- if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
- if (!this.readBuffer.hasRemaining()) {
- this.readBuffer = ByteBufferUtils
- .increaseBufferCapatity(this.readBuffer);
- }
- this.readFuture = this.asynchronousSocketChannel.read(
- this.readBuffer, this, this.readCompletionHandler);
- } else {
- thrownew IllegalStateException(
- "Session Or Channel has been closed");
- }
- }
- }
AsynchronousSocketChannel的read呼叫與AsynchronousServerSocketChannel的accept呼叫類似,同樣是非阻塞的,返回結果也是一個Future,但是寫的結果是整數,表示寫入了多少位元組,因此read呼叫返回的是Future<Integer>,方法的第一個引數是讀的緩衝區,作業系統將IO讀到資料拷貝到這個緩衝區,第二個引數是傳遞給 CompletionHandler的attchment,第三個引數就是註冊的用於回撥的CompletionHandler。這裡儲存了read的結果Future,這是為了在關閉連線的時候能夠主動取消呼叫,accept也是如此。現在可以看看read的CompletionHandler的實現:
Java程式碼- publicfinalclass ReadCompletionHandler implements
- CompletionHandler<Integer, AbstractAioSession> {
- privatestaticfinal Logger log = LoggerFactory
- .getLogger(ReadCompletionHandler.class);
- protectedfinal AioTCPController controller;
- public ReadCompletionHandler(AioTCPController controller) {
- this.controller = controller;
- }
- @Override
- publicvoid cancelled(AbstractAioSession session) {
- log.warn("Session(" + session.getRemoteSocketAddress()
- + ") read operation was canceled");
- }
- @Override
- publicvoid completed(Integer result, AbstractAioSession session) {
- if (log.isDebugEnabled())
- log.debug("Session(" + session.getRemoteSocketAddress()
- + ") read +" + result + " bytes");
- if (result < 0) {
- session.close();
- return;
- }
- try {
- if (result > 0) {
- session.updateTimeStamp();
- session.getReadBuffer().flip();
- session.decode();
- session.getReadBuffer().compact();
- }
- } finally {
- try {
- session.pendingRead();
- } catch (IOException e) {
- session.onException(e);
- session.close();
- }
- }
- controller.checkSessionTimeout();
- }
- @Override
- publicvoid failed(Throwable exc, AbstractAioSession session) {
- log.error("Session read error", exc);
- session.onException(exc);
- session.close();
- }
- }
如果IO讀失敗,會返回失敗產生的異常,這種情況下我們就主動關閉連線,通過session.close()方法,這個方法幹了兩件事情:關閉channel和取消read呼叫:
Java程式碼- if (null != this.readFuture) {
- this.readFuture.cancel(true);
- }
- this.asynchronousSocketChannel.close();
在讀成功的情況下,我們還需要判斷結果result是否小於0,如果小於0就表示對端關閉了,這種情況下我們也主動關閉連線並返回。如果讀到一定位元組,也就是result大於0的情況下,我們就嘗試從讀緩衝區中decode出訊息,並派發給業務處理器的回撥方法,最終通過pendingRead繼續發起read呼叫等待socket的下一次可讀。可見,我們並不需要自己去呼叫channel來進行IO讀,而是作業系統幫你直接讀到了緩衝區,然後給你一個結果表示讀入了多少位元組,你處理這個結果即可。而nonblocking
IO框架中,是reactor通知使用者執行緒socket可讀了,然後使用者執行緒自己去呼叫read進行實際讀操作。這裡還有個需要注意的地方,就是decode出來的訊息的派發給業務處理器工作最好交給一個執行緒池來處理,避免阻塞group繫結的執行緒池。
IO寫的操作與此類似,不過通常寫的話我們會在session中關聯一個緩衝佇列來處理,沒有完全寫入或者等待寫入的訊息都存放在佇列中,佇列為空的情況下發起write呼叫:
- protectedvoid write0(WriteMessage message) {
- boolean needWrite = false;
- synchronized (this.writeQueue) {
- needWrite = this.writeQueue.isEmpty();
- this.writeQueue.offer(message);
- }
- if (needWrite) {
- pendingWrite(message);
- }
- }
- protectedfinalvoid pendingWrite(WriteMessage message) {
- message = preprocessWriteMessage(message);
- if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
- this.asynchronousSocketChannel.write(message.getWriteBuffer(),
- this, this.writeCompletionHandler);
- } else {
- thrownew IllegalStateException(
- "Session Or Channel has been closed");
- }
- }
write呼叫返回的結果與read一樣是一個Future<Integer>,而write的CompletionHandler處理的核心邏輯大概是這樣:
Java程式碼- @Override
- publicvoid completed(Integer result, AbstractAioSession session) {
- if (log.isDebugEnabled())
- log.debug("Session(" + session.getRemoteSocketAddress()
- + ") writen " + result + " bytes");
- WriteMessage writeMessage;
- Queue<WriteMessage> writeQueue = session.getWriteQueue();
- synchronized (writeQueue) {
- writeMessage = writeQueue.peek();
- if (writeMessage.getWriteBuffer() == null
- || !writeMessage.getWriteBuffer().hasRemaining()) {
- writeQueue.remove();
- if (writeMessage.getWriteFuture() != null) {
- writeMessage.getWriteFuture().setResult(Boolean.TRUE);
- }
- try {
- session.getHandler().onMessageSent(session,
- writeMessage.getMessage());
- } catch (Exception e) {
- session.onException(e);
- }
- writeMessage = writeQueue.peek();
- }
- }
- if (writeMessage != null) {
- try {
- session.pendingWrite(writeMessage);
- } catch (IOException e) {
- session.onException(e);
- session.close();
- }
- }
- }
compete方法中的result就是實際寫入的位元組數,然後我們判斷訊息的緩衝區是否還有剩餘,如果沒有就將訊息從佇列中移除,如果佇列中還有訊息,那麼繼續發起write呼叫。
重複一下,這裡引用的程式碼都是yanf4j aio分支中的原始碼,感興趣的朋友可以直接check out出來看看:
http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
在引入了aio之後,java對於網路層的支援已經非常完善,該有的都有了,java也已經成為伺服器開發的首選語言之一。java的弱項在於對記憶體的管理上,由於這一切都交給了GC,因此在高效能的網路伺服器上還是Cpp的天下。java這種單一堆模型比之erlang的程序內堆模型還是有差距,很難做到高效的垃圾回收和細粒度的記憶體管理。
這裡僅僅是介紹了aio開發的核心流程,對於一個網路框架來說,還需要考慮超時的處理、緩衝buffer的處理、業務層和網路層的切分、可擴充套件性、效能的可調性以及一定的通用性要求。
jdk7以前的nio是非阻塞IO,作業系統底層比方說linux,是用IO複用select實現的
epoll也不是非同步IO啊。非同步IO在linux上目前僅限於檔案系統,並且還沒有得到廣泛應用,很多平臺都沒有這玩意。
java aio在windows上是利用iocp實現的,這是真正的非同步IO。而在linux上,是通過epoll模擬非同步的
參考:
[1] NIO.2 入門,第 1 部分: 非同步通道 API http://www.ibm.com/developerworks/cn/java/j-nio2-1/
[2] NIO.2 入門,第 2 部分: 檔案系統 API:http://www.ibm.com/developerworks/cn/java/j-nio2-2/index.html