1. 程式人生 > >JAVA NIO非同步通訊框架MINA選型和使用的幾個細節(概述入門,UDP, 心跳)

JAVA NIO非同步通訊框架MINA選型和使用的幾個細節(概述入門,UDP, 心跳)

Apache MINA 2 是一個開發高效能和高可伸縮性網路應用程式的網路應用框架。它提供了一個抽象的事件驅動的非同步 API,可以使用 TCP/IP、UDP/IP、串列埠和虛擬機器內部的管道等傳輸方式。Apache MINA 2 可以作為開發網路應用程式的一個良好基礎。

    Apache MINA是非常著名的基於java nio的通訊框架,以前都是自己直接使用udp程式設計,新專案選型中考慮到網路通訊可能會用到多種通訊方式,因此使用了MINA。

     本文結構:

     (1)客戶端和伺服器程式碼;雖然是udp的,但是mina的優美的設計使得所有的通訊方式能夠以統一的形式使用,perfect。當然注意的是,不同的通訊方式,背後的機理和有效的變數、狀態是有區別的,所以要精通,那還是需要經驗積累和學習的。

     (2)超時和Session的幾個實際問題

     (3)心跳,糾正幾個錯誤

     既然是使用,廢話少說,直接整個可用的例子。當然了,這些程式碼也不是直接可用的,我們應用的邏輯有點複雜,不會這麼簡單使用的。

版本2.0 RC1

1.1 伺服器端

  1.                 NioDatagramAcceptor acceptor = new NioDatagramAcceptor(); 
  2.                 acceptor.setHandler(new MyIoHandlerAdapter());//你的業務處理,最簡單的,可以extends IoHandlerAdapter
  3. DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 
  4. chain.addLast("keep-alive", new HachiKeepAliveFilterInMina()); //心跳
  5. chain.addLast("toMessageTyep", new MyMessageEn_Decoder());  
  6.               //將傳輸的資料轉換成你的業務資料格式。比如下面的是將資料轉換成一行行的文字
  7.                 //acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
  8. chain.addLast("logger", new LoggingFilter()); 
  9. DatagramSessionConfig dcfg = acceptor.getSessionConfig(); 
  10. dcfg.setReuseAddress(true); 
  11. acceptor.bind(new InetSocketAddress(ClusterContext.getHeartBeatPort())); 

1.2 客戶端

  1.               NioDatagramConnector connector = new NioDatagramConnector(); 
  2. connector.setConnectTimeoutMillis(60000L); 
  3. connector.setConnectTimeoutCheckInterval(10000); 
  4. connector.setHandler(handler); 
  5. DefaultIoFilterChainBuilder chain = connector.getFilterChain(); 
  6. chain.addLast("keep-alive", new HachiKeepAliveFilterInMina());//心跳
  7. chain.addLast("toMessageTyep", new MyMessageEn_Decoder()); 
  8. chain.addLast("logger", new LoggingFilter()); 
  9. ConnectFuture connFuture = connector.connect(new InetSocketAddress("10.1.1.1",8001)); 
  10. connFuture.awaitUninterruptibly(); 
  11. IoSession session = connFuture.getSession(); 
  12.                 //傳送訊息長整型 1000
  13.               IoBuffer buffer = IoBuffer.allocate(8); 
  14.               buffer.putLong(1000); 
  15.               buffer.flip(); 
  16.               session.write(buffer); 
  17.                  //關閉連線
  18.                  session.getCloseFuture().awaitUninterruptibly(); 
  19. connector.dispose(); 

2. 超時的幾個經驗總結:

    udp session預設是60秒鐘超時,此時狀態為closing,資料就發不出去了。

Session的介面是IoSession,udp的最終實現是NioSession。如果互動在60秒內不能處理完成,就需要使用Keep-alive機制,即心跳機制。

3. 心跳機制

    在程式碼中已經使用了心跳機制,是通過mina的filter實現的,mina自身帶的心跳機制好處在於,它附加了處理,讓心跳訊息不會傳到業務層,在底層就完成了。

    在上面程式碼實現中的HachiKeepAliveFilterInMina如下:

  1. publicclass HachiKeepAliveFilterInMina extends KeepAliveFilter { 
  2.     privatestaticfinalint INTERVAL = 30;//in seconds
  3.     privatestaticfinalint TIMEOUT = 10; //in seconds
  4.     public HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) { 
  5.         super(messageFactory, IdleStatus.BOTH_IDLE, new ExceptionHandler(), INTERVAL, TIMEOUT); 
  6.     } 
  7.     public HachiKeepAliveFilterInMina() { 
  8.         super(new KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE, new ExceptionHandler(), INTERVAL, TIMEOUT); 
  9.         this.setForwardEvent(false); //此訊息不會繼續傳遞,不會被業務層看見
  10.     } 
  11. class ExceptionHandler implements KeepAliveRequestTimeoutHandler {    
  12.     publicvoid keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session) throws Exception {    
  13.         System.out.println("Connection lost, session will be closed");    
  14.         session.close(true);  
  15.     }    
  16. /**
  17. * 繼承於KeepAliveMessageFactory,當心跳機制啟動的時候,需要該工廠類來判斷和定製心跳訊息
  18. * @author Liu Liu
  19. *
  20. */
  21. class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory { 
  22.     privatestaticfinalbyte int_req = -1
  23.     privatestaticfinalbyte int_rep = -2;  
  24.     privatestaticfinal IoBuffer KAMSG_REQ = IoBuffer.wrap(newbyte[]{int_req});    
  25.     privatestaticfinal IoBuffer KAMSG_REP = IoBuffer.wrap(newbyte[]{int_rep});   
  26.     public Object getRequest(IoSession session) {    
  27.         return KAMSG_REQ.duplicate();    
  28.     }    
  29.     public Object getResponse(IoSession session, Object request) {    
  30.         return KAMSG_REP.duplicate();    
  31.     }    
  32.     publicboolean isRequest(IoSession session, Object message) {   
  33.         if(!(message instanceof IoBuffer)) 
  34.             return false
  35.         IoBuffer realMessage = (IoBuffer)message; 
  36.         if(realMessage.limit() != 1
  37.             returnfalse
  38.         boolean result = (realMessage.get() == int_req); 
  39.         realMessage.rewind(); 
  40.         return result; 
  41.     }    
  42.     publicboolean isResponse(IoSession session, Object message) {     
  43.         if(!(message instanceof IoBuffer)) 
  44.             return false
  45.         IoBuffer realMessage = (IoBuffer)message; 
  46.         if(realMessage.limit() != 1
  47.             returnfalse
  48.         boolean result = (realMessage.get() == int_rep);    
  49.         realMessage.rewind(); 
  50.         return result; 
  51.     }    

  有人說:心跳機制的filter只需要伺服器端具有即可——這是錯誤的,拍著腦袋想一想,看看factory,你就知道了。心跳需要通訊兩端的實現

  另外,版本2.0 RC1中,經過測試,當心跳的時間間隔INTERVAL設定為60s(Session的存活時間)的時候心跳會失效,所以最好需要小於60s的間隔。

更多可參考:

以上只是初步使用的一點經驗,2和3的部分很難在網上找到,所以分享出來,因為時間關係、可能有些偏頗或者錯誤,望指正交流。