1. 程式人生 > >JAVA NIO2模式之Proactor( JDK7 AIO非同步網路IO初探)

JAVA NIO2模式之Proactor( JDK7 AIO非同步網路IO初探)

  按照《Unix網路程式設計》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO複用、訊號驅動IO和非同步IO,按照POSIX標準來劃分只分為兩類:同步IO和非同步IO。如何區分呢?首先一個IO操作其實分成了兩個步驟:發起IO請求和實際的IO操作,同步IO和非同步IO的區別就在於第二個步驟是否阻塞,如果實際的IO讀寫阻塞請求程序,那麼就是同步IO,因此阻塞IO、非阻塞IO、IO複用、訊號驅動IO都是同步IO,如果不阻塞,而是作業系統幫你做完IO操作再將結果返回給你,那麼就是非同步IO。阻塞IO和非阻塞IO的區別在於第一步,發起IO請求是否會被阻塞,如果阻塞直到完成那麼就是傳統的阻塞IO,如果不阻塞,那麼就是非阻塞IO。

   Java nio 2.0的主要改進就是引入了非同步IO(包括檔案和網路),這裡主要介紹下非同步網路IO API的使用以及框架的設計,以TCP服務端為例。首先看下為了支援AIO引入的新的類和介面:

 java.nio.channels.AsynchronousChannel
       標記一個channel支援非同步IO操作。

 java.nio.channels.AsynchronousServerSocketChannel
       ServerSocket的aio版本,建立TCP服務端,繫結地址,監聽埠等。

 java.nio.channels.AsynchronousSocketChannel

       面向流的非同步socket channel,表示一個連線。

 java.nio.channels.AsynchronousChannelGroup
       非同步channel的分組管理,目的是為了資源共享。一個AsynchronousChannelGroup繫結一個執行緒池,這個執行緒池執行兩個任務:處理IO事件和派發CompletionHandler。AsynchronousServerSocketChannel建立的時候可以傳入一個 AsynchronousChannelGroup,那麼通過AsynchronousServerSocketChannel建立的 AsynchronousSocketChannel將同屬於一個組,共享資源。

 java.nio.channels.CompletionHandler

       非同步IO操作結果的回撥介面,用於定義在IO操作完成後所作的回撥工作。AIO的API允許兩種方式來處理非同步操作的結果:返回的Future模式或者註冊CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的呼叫是由 AsynchronousChannelGroup的執行緒池派發的。顯然,執行緒池的大小是效能的關鍵因素。AsynchronousChannelGroup允許繫結不同的執行緒池,通過三個靜態方法來建立: Java程式碼  收藏程式碼
  1. publicstatic AsynchronousChannelGroup withFixedThreadPool(
    int nThreads,  
  2.                                                               ThreadFactory threadFactory)  
  3.        throws IOException  
  4. publicstatic AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,  
  5.                                                                int initialSize)  
  6. publicstatic AsynchronousChannelGroup withThreadPool(ExecutorService executor)  
  7.        throws IOException  
 

     需要根據具體應用相應調整,從框架角度出發,需要暴露這樣的配置選項給使用者。

     在介紹完了aio引入的TCP的主要介面和類之後,我們來設想下一個aio框架應該怎麼設計。參考非阻塞nio框架的設計,一般都是採用Reactor模式,Reacot負責事件的註冊、select、事件的派發;相應地,非同步IO有個Proactor模式,Proactor負責 CompletionHandler的派發,檢視一個典型的IO寫操作的流程來看兩者的區別:

     Reactor:  send(msg) -> 訊息佇列是否為空,如果為空  -> 向Reactor註冊OP_WRITE,然後返回 -> Reactor select -> 觸發Writable,通知使用者執行緒去處理 ->先登出Writable(很多人遇到的cpu 100%的問題就在於沒有登出),處理Writeable,如果沒有完全寫入,繼續註冊OP_WRITE。注意到,寫入的工作還是使用者執行緒在處理。
     Proactor: send(msg) -> 訊息佇列是否為空,如果為空,發起read非同步呼叫,並註冊CompletionHandler,然後返回。 -> 作業系統負責將你的訊息寫入,並返回結果(寫入的位元組數)給Proactor -> Proactor派發CompletionHandler。可見,寫入的工作是作業系統在處理,無需使用者執行緒參與。事實上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色

    CompletionHandler有三個方法,分別對應於處理成功、失敗、被取消(通過返回的Future)情況下的回撥處理:

Java程式碼  收藏程式碼
  1. publicinterface CompletionHandler<V,A> {  
  2.      void completed(V result, A attachment);  
  3.     void failed(Throwable exc, A attachment);  
  4.     void cancelled(A attachment);  
  5. }  
 


    其中的泛型引數V表示IO呼叫的結果,而A是發起呼叫時傳入的attchment。

    在初步介紹完aio引入的類和介面後,我們看看一個典型的tcp服務端是怎麼啟動的,怎麼接受連線並處理讀和寫,這裡引用的程式碼都是yanf4j 的aio分支中的程式碼,可以從svn checkout,svn地址:http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

    第一步,建立一個AsynchronousServerSocketChannel,建立之前先建立一個 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以繫結一個 AsynchronousChannelGroup,那麼通過這個AsynchronousServerSocketChannel建立的連線都將同屬於一個AsynchronousChannelGroup並共享資源:

Java程式碼  收藏程式碼
  1. this.asynchronousChannelGroup = AsynchronousChannelGroup  
  2.                     .withCachedThreadPool(Executors.newCachedThreadPool(),  
  3.                             this.threadPoolSize);  

     然後初始化一個AsynchronousServerSocketChannel,通過open方法:

Java程式碼  收藏程式碼
  1. this.serverSocketChannel = AsynchronousServerSocketChannel  
  2.                 .open(this.asynchronousChannelGroup);  
 

    通過nio 2.0引入的SocketOption類設定一些TCP選項:

Java程式碼  收藏程式碼
  1. this.serverSocketChannel  
  2.                     .setOption(  
  3.                             StandardSocketOption.SO_REUSEADDR,true);  
  4. this.serverSocketChannel  
  5.                     .setOption(  
  6.                             StandardSocketOption.SO_RCVBUF,16*1024);  
 


    繫結本地地址:

Java程式碼  收藏程式碼
  1. this.serverSocketChannel  
  2.                     .bind(new InetSocketAddress("localhost",8080), 100);  
 

  
    其中的100用於指定等待連線的佇列大小(backlog)。完了嗎?還沒有,最重要的監聽工作還沒開始,監聽埠是為了等待連線上來以便accept產生一個AsynchronousSocketChannel來表示一個新建立的連線,因此需要發起一個accept呼叫,呼叫是非同步的,作業系統將在連線建立後,將最後的結果——AsynchronousSocketChannel返回給你:

Java程式碼  收藏程式碼
  1. publicvoid pendingAccept() {  
  2.         if (this.started && this.serverSocketChannel.isOpen()) {  
  3.             this.acceptFuture = this.serverSocketChannel.accept(null,  
  4.                     new AcceptCompletionHandler());  
  5.         } else {  
  6.             thrownew IllegalStateException("Controller has been closed");  
  7.         }  
  8.     }  
 


   注意,重複的accept呼叫將會丟擲PendingAcceptException,後文提到的read和write也是如此。accept方法的第一個引數是你想傳給CompletionHandler的attchment,第二個引數就是註冊的用於回撥的CompletionHandler,最後返回結果Future<AsynchronousSocketChannel>。你可以對future做處理,這裡採用更推薦的方式就是註冊一個CompletionHandler。那麼accept的CompletionHandler中做些什麼工作呢?顯然一個赤裸裸的 AsynchronousSocketChannel是不夠的,我們需要將它封裝成session,一個session表示一個連線(mina裡就叫 IoSession了),裡面帶了一個緩衝的訊息佇列以及一些其他資源等。在連線建立後,除非你的伺服器只准備接受一個連線,不然你需要在後面繼續呼叫pendingAccept來發起另一個accept請求

Java程式碼  收藏程式碼
  1. privatefinalclass AcceptCompletionHandler implements
  2.             CompletionHandler<AsynchronousSocketChannel, Object> {  
  3.         @Override
  4.         publicvoid cancelled(Object attachment) {  
  5.             logger.warn("Accept operation was canceled");  
  6.         }  
  7.         @Override
  8.         publicvoid completed(AsynchronousSocketChannel socketChannel,  
  9.                 Object attachment) {  
  10.             try {  
  11.                 logger.debug("Accept connection from "
  12.                         + socketChannel.getRemoteAddress());  
  13.                 configureChannel(socketChannel);  
  14.                 AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);  
  15.                 Session session = new AioTCPSession(sessionConfig,  
  16.                         AioTCPController.this.configuration  
  17.                                 .getSessionReadBufferSize(),  
  18.                         AioTCPController.this.sessionTimeout);  
  19.                 session.start();  
  20.                 registerSession(session);  
  21.             } catch (Exception e) {  
  22.                 e.printStackTrace();  
  23.                 logger.error("Accept error", e);  
  24.                 notifyException(e);  
  25.             } finally {  
  26.                 <strong>pendingAccept</strong>();  
  27.             }  
  28.         }  
  29.         @Override
  30.         publicvoid failed(Throwable exc, Object attachment) {  
  31.             logger.error("Accept error", exc);  
  32.             try {  
  33.                 notifyException(exc);  
  34.             } finally {  
  35.                 <strong>pendingAccept</strong>();  
  36.             }  
  37.         }  
  38.     }  
 

 
    注意到了吧,我們在failed和completed方法中在最後都呼叫了pendingAccept來繼續發起accept呼叫,等待新的連線上來。有的同學可能要說了,這樣搞是不是遞迴呼叫,會不會堆疊溢位?實際上不會,因為發起accept呼叫的執行緒與CompletionHandler回撥的執行緒並非同一個,不是一個上下文中,兩者之間沒有耦合關係。要注意到,CompletionHandler的回撥共用的是 AsynchronousChannelGroup繫結的執行緒池,因此千萬別在CompletionHandler回撥方法中呼叫阻塞或者長時間的操作,例如sleep,回撥方法最好能支援超時,防止執行緒池耗盡。

    連線建立後,怎麼讀和寫呢?回憶下在nonblocking nio框架中,連線建立後的第一件事是幹什麼?註冊OP_READ事件等待socket可讀。非同步IO也同樣如此,連線建立後馬上發起一個非同步read呼叫,等待socket可讀,這個是Session.start方法中所做的事情:

Java程式碼  收藏程式碼
  1. publicclass AioTCPSession {  
  2.     protectedvoid start0() {  
  3.         pendingRead();  
  4.     }  
  5.     protectedfinalvoid pendingRead() {  
  6.         if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  
  7.             if (!this.readBuffer.hasRemaining()) {  
  8.                 this.readBuffer = ByteBufferUtils  
  9.                         .increaseBufferCapatity(this.readBuffer);  
  10.             }  
  11.             this.readFuture = this.asynchronousSocketChannel.read(  
  12.                     this.readBuffer, thisthis.readCompletionHandler);  
  13.         } else {  
  14.             thrownew IllegalStateException(  
  15.                     "Session Or Channel has been closed");  
  16.         }  
  17.     }  
  18. }  
 

     AsynchronousSocketChannel的read呼叫與AsynchronousServerSocketChannel的accept呼叫類似,同樣是非阻塞的,返回結果也是一個Future,但是寫的結果是整數,表示寫入了多少位元組,因此read呼叫返回的是Future<Integer>,方法的第一個引數是讀的緩衝區,作業系統將IO讀到資料拷貝到這個緩衝區,第二個引數是傳遞給 CompletionHandler的attchment,第三個引數就是註冊的用於回撥的CompletionHandler。這裡儲存了read的結果Future,這是為了在關閉連線的時候能夠主動取消呼叫,accept也是如此。現在可以看看read的CompletionHandler的實現:

Java程式碼  收藏程式碼
  1. publicfinalclass ReadCompletionHandler implements
  2.         CompletionHandler<Integer, AbstractAioSession> {  
  3.     privatestaticfinal Logger log = LoggerFactory  
  4.             .getLogger(ReadCompletionHandler.class);  
  5.     protectedfinal AioTCPController controller;  
  6.     public ReadCompletionHandler(AioTCPController controller) {  
  7.         this.controller = controller;  
  8.     }  
  9.     @Override
  10.     publicvoid cancelled(AbstractAioSession session) {  
  11.         log.warn("Session(" + session.getRemoteSocketAddress()  
  12.                 + ") read operation was canceled");  
  13.     }  
  14.     @Override
  15.     publicvoid completed(Integer result, AbstractAioSession session) {  
  16.         if (log.isDebugEnabled())  
  17.             log.debug("Session(" + session.getRemoteSocketAddress()  
  18.                     + ") read +" + result + " bytes");  
  19.         if (result < 0) {  
  20.             session.close();  
  21.             return;  
  22.         }  
  23.         try {  
  24.             if (result > 0) {  
  25.                 session.updateTimeStamp();  
  26.                 session.getReadBuffer().flip();  
  27.                 session.decode();  
  28.                 session.getReadBuffer().compact();  
  29.             }  
  30.         } finally {  
  31.             try {  
  32.                 session.pendingRead();  
  33.             } catch (IOException e) {  
  34.                 session.onException(e);  
  35.                 session.close();  
  36.             }  
  37.         }  
  38.         controller.checkSessionTimeout();  
  39.     }  
  40.     @Override
  41.     publicvoid failed(Throwable exc, AbstractAioSession session) {  
  42.         log.error("Session read error", exc);  
  43.         session.onException(exc);  
  44.         session.close();  
  45.     }  
  46. }  
 

   如果IO讀失敗,會返回失敗產生的異常,這種情況下我們就主動關閉連線,通過session.close()方法,這個方法幹了兩件事情:關閉channel和取消read呼叫:

Java程式碼  收藏程式碼
  1. if (null != this.readFuture) {  
  2.             this.readFuture.cancel(true);  
  3.         }  
  4. this.asynchronousSocketChannel.close();  
 

   在讀成功的情況下,我們還需要判斷結果result是否小於0,如果小於0就表示對端關閉了,這種情況下我們也主動關閉連線並返回。如果讀到一定位元組,也就是result大於0的情況下,我們就嘗試從讀緩衝區中decode出訊息,並派發給業務處理器的回撥方法,最終通過pendingRead繼續發起read呼叫等待socket的下一次可讀。可見,我們並不需要自己去呼叫channel來進行IO讀,而是作業系統幫你直接讀到了緩衝區,然後給你一個結果表示讀入了多少位元組,你處理這個結果即可。而nonblocking IO框架中,是reactor通知使用者執行緒socket可讀了,然後使用者執行緒自己去呼叫read進行實際讀操作。這裡還有個需要注意的地方,就是decode出來的訊息的派發給業務處理器工作最好交給一個執行緒池來處理,避免阻塞group繫結的執行緒池。
 
   IO寫的操作與此類似,不過通常寫的話我們會在session中關聯一個緩衝佇列來處理,沒有完全寫入或者等待寫入的訊息都存放在佇列中,佇列為空的情況下發起write呼叫:

Java程式碼  收藏程式碼
  1. protectedvoid write0(WriteMessage message) {  
  2.       boolean needWrite = false;  
  3.       synchronized (this.writeQueue) {  
  4.           needWrite = this.writeQueue.isEmpty();  
  5.           this.writeQueue.offer(message);  
  6.       }  
  7.       if (needWrite) {  
  8.           pendingWrite(message);  
  9.       }  
  10.   }  
  11.   protectedfinalvoid pendingWrite(WriteMessage message) {  
  12.       message = preprocessWriteMessage(message);  
  13.       if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  
  14.           this.asynchronousSocketChannel.write(message.getWriteBuffer(),  
  15.                   thisthis.writeCompletionHandler);  
  16.       } else {  
  17.           thrownew IllegalStateException(  
  18.                   "Session Or Channel has been closed");  
  19.       }  
  20.   }  
 

    write呼叫返回的結果與read一樣是一個Future<Integer>,而write的CompletionHandler處理的核心邏輯大概是這樣:

Java程式碼  收藏程式碼
  1. @Override
  2.     publicvoid completed(Integer result, AbstractAioSession session) {  
  3.         if (log.isDebugEnabled())  
  4.             log.debug("Session(" + session.getRemoteSocketAddress()  
  5.                     + ") writen " + result + " bytes");  
  6.         WriteMessage writeMessage;  
  7.         Queue<WriteMessage> writeQueue = session.getWriteQueue();  
  8.         synchronized (writeQueue) {  
  9.             writeMessage = writeQueue.peek();  
  10.             if (writeMessage.getWriteBuffer() == null
  11.                     || !writeMessage.getWriteBuffer().hasRemaining()) {  
  12.                 writeQueue.remove();  
  13.                 if (writeMessage.getWriteFuture() != null) {  
  14.                     writeMessage.getWriteFuture().setResult(Boolean.TRUE);  
  15.                 }  
  16.                 try {  
  17.                     session.getHandler().onMessageSent(session,  
  18.                             writeMessage.getMessage());  
  19.                 } catch (Exception e) {  
  20.                     session.onException(e);  
  21.                 }  
  22.                 writeMessage = writeQueue.peek();  
  23.             }  
  24.         }  
  25.         if (writeMessage != null) {  
  26.             try {  
  27.                 session.pendingWrite(writeMessage);  
  28.             } catch (IOException e) {  
  29.                 session.onException(e);  
  30.                 session.close();  
  31.             }  
  32.         }  
  33.     }  
 


   compete方法中的result就是實際寫入的位元組數,然後我們判斷訊息的緩衝區是否還有剩餘,如果沒有就將訊息從佇列中移除,如果佇列中還有訊息,那麼繼續發起write呼叫。

   重複一下,這裡引用的程式碼都是yanf4j aio分支中的原始碼,感興趣的朋友可以直接check out出來看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
   在引入了aio之後,java對於網路層的支援已經非常完善,該有的都有了,java也已經成為伺服器開發的首選語言之一。java的弱項在於對記憶體的管理上,由於這一切都交給了GC,因此在高效能的網路伺服器上還是Cpp的天下。java這種單一堆模型比之erlang的程序內堆模型還是有差距,很難做到高效的垃圾回收和細粒度的記憶體管理。

   這裡僅僅是介紹了aio開發的核心流程,對於一個網路框架來說,還需要考慮超時的處理、緩衝buffer的處理、業務層和網路層的切分、可擴充套件性、效能的可調性以及一定的通用性要求。

jdk7以前的nio是非阻塞IO,作業系統底層比方說linux,是用IO複用select實現的

epoll也不是非同步IO啊。非同步IO在linux上目前僅限於檔案系統,並且還沒有得到廣泛應用,很多平臺都沒有這玩意。
java aio在windows上是利用iocp實現的,這是真正的非同步IO。而在linux上,是通過epoll模擬非同步的

參考:

[1] NIO.2 入門,第 1 部分: 非同步通道 API http://www.ibm.com/developerworks/cn/java/j-nio2-1/

[2] NIO.2 入門,第 2 部分: 檔案系統 API:http://www.ibm.com/developerworks/cn/java/j-nio2-2/index.html