1. 程式人生 > >mina作為客戶端斷線重連

mina作為客戶端斷線重連

定義:這裡討論的Mina 斷線重連是指使用mina作為客戶端軟體,連線其他提供Socket通訊服務的伺服器端。Socket伺服器可以是Mina提供的伺服器,也可以是C++提供的伺服器。


    

一、斷線重連的方式;

    1. 在建立Mina客戶端時增加一個監聽器,或者增加一個攔截器,當檢測到Session關閉時,自動進行重連。


    

    2. 在第1種方式的基礎上,增加客戶端的讀寫通道空閒檢查,當發生Session關閉或者讀寫空閒時,進行重連。


    

        第一種方式比較傳統,優點是簡單方便,適合網路穩定、資料量不大(1M頻寬以下)的環境;不過缺點是不能對系統級的連線斷開阻塞進行捕獲。

        第二種方式更加精細,基本上能捕獲到應用、網路、系統級的斷連。

二、重連目的:

        在使用Mina做為客戶端時,往往因為網路、伺服器、應用程式出現問題而導致連線斷開,而自動重連,就是解決連線斷開的唯一方式。如果網線斷開、伺服器宕機、應用程式掛了,都是斷線的原因,這個時候,通過增加一個監聽器或者攔截器,就能實現重連。但是生產環境中,斷線的原因可能更復雜:網路不穩定、延時、伺服器負載高、伺服器或者應用程式的傳送或者接收緩衝區滿等等問題都可能導致資料傳輸過程出現類似於斷線的情況,這個時候,光檢測Session關閉是遠遠不夠的,這個時候就需要一種重連機制,比如讀寫空閒超過30秒,就進行重連。對於資料不間斷、實時性高、資料量大的應用場景,更是實用。

三、例項:

    第一種:監聽器方式

       建立一個監聽器實現mina的IoServiceListener介面,裡面的方法可以不用寫實現

import org.apache.mina.core.service.IoServiceListener;  
import org.apache.mina.core.session.IdleStatus;  
import org.apache.mina.core.session.IoSession;  
  
public class IoListener implements IoServiceListener{  
    @Override  
    public void serviceActivated(IoService arg0) throws Exception {  
        // TODO Auto-generated method stub  
    }  
    @Override  
    public void serviceDeactivated(IoService arg0) throws Exception {  
        // TODO Auto-generated method stub  
    }  
    @Override  
    public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {  
        // TODO Auto-generated method stub  
    }  
    @Override  
    public void sessionCreated(IoSession arg0) throws Exception {  
        // TODO Auto-generated method stub  
    }  
  
    @Override  
    public void sessionDestroyed(IoSession arg0) throws Exception {  
        // TODO Auto-generated method stub  
    }  

再建立客戶端時加入監聽
 NioSocketConnector connector = new NioSocketConnector();  //建立連線客戶端  
        connector.setConnectTimeoutMillis(30000); //設定連線超時  
        connector.getSessionConfig().setReceiveBufferSize(10240);   // 設定接收緩衝區的大小  
        connector.getSessionConfig().setSendBufferSize(10240);// 設定輸出緩衝區的大小  
//      加入解碼器  
        TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("GBK"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());  
        factory.setDecoderMaxLineLength(10240);  
        factory.setEncoderMaxLineLength(10240);  
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));  
        connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 設定預設訪問地址  
        //新增處理器  
                connector.setHandler(new IoHandler());   
          
                // 新增重連監聽  
        connector.addListener(new IoListener() {  
            @Override  
            public void sessionDestroyed(IoSession arg0) throws Exception {  
                for (;;) {  
                    try {  
                        Thread.sleep(3000);  
                        ConnectFuture future = connector.connect();  
                        future.awaitUninterruptibly();// 等待連線建立成功  
                        session = future.getSession();// 獲取會話  
                        if (session.isConnected()) {  
                            logger.info("斷線重連[" + connector.getDefaultRemoteAddress().getHostName() + ":" + connector.getDefaultRemoteAddress().getPort() + "]成功");  
                            break;  
                        }  
                    } catch (Exception ex) {  
                        logger.info("重連伺服器登入失敗,3秒再連線一次:" + ex.getMessage());  
                    }  
                }  
            }  
        });  
                for (;;) {  
            try {  
                ConnectFuture future = connector.connect();  
                future.awaitUninterruptibly(); // 等待連線建立成功    
                        session = future.getSession(); // 獲取會話     
                logger.info("連線服務端" + host + ":" + port + "[成功]" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));  
                break;  
            } catch (RuntimeIoException e) {  
                logger.error("連線服務端" + host + ":" + port + "失敗" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 連線MSG異常,請檢查MSG埠、IP是否正確,MSG服務是否啟動,異常內容:" + e.getMessage(), e);  
                Thread.sleep(5000);// 連線失敗後,重連間隔5s  
            }  
        }  

第一種:攔截器方式
connector = new NioSocketConnector();  //建立連線客戶端  
        connector.setConnectTimeoutMillis(30000); //設定連線超時  
//      斷線重連回調攔截器  
        connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {  
            @Override  
            public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {  
                for(;;){  
                    try{  
                        Thread.sleep(3000);  
                        ConnectFuture future = connector.connect();  
                        future.awaitUninterruptibly();// 等待連線建立成功  
                        session = future.getSession();// 獲取會話  
                        if(session.isConnected()){  
                            logger.info("斷線重連["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  
                            break;  
                        }  
                    }catch(Exception ex){  
                        logger.info("重連伺服器登入失敗,3秒再連線一次:" + ex.getMessage());  
                    }  
                }  
            }  
        });  
          
        TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());  
        factory.setDecoderMaxLineLength(10240);  
        factory.setEncoderMaxLineLength(10240);  
        //加入解碼器  
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));  
        //新增處理器  
                connector.setHandler(new IoHandler());  
        connector.getSessionConfig().setReceiveBufferSize(10240);   // 設定接收緩衝區的大小  
        connector.getSessionConfig().setSendBufferSize(10240);          // 設定輸出緩衝區的大小  
        connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 設定預設訪問地址  
        for (;;) {  
            try {  
                ConnectFuture future = connector.connect();  
                // 等待連線建立成功  
                future.awaitUninterruptibly();  
                // 獲取會話  
                session = future.getSession();  
                logger.error("連線服務端" + host + ":" + port + "[成功]" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));  
                break;  
            } catch (RuntimeIoException e) {  
                logger.error("連線服務端" + host + ":" + port + "失敗" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 連線MSG異常,請檢查MSG埠、IP是否正確,MSG服務是否啟動,異常內容:" + e.getMessage(), e);  
                Thread.sleep(5000);// 連線失敗後,重連間隔5s  
            }  
        }

第二種:加入空閒檢測機制

        空閒檢測機制需要在建立客戶端時,加入空閒超時,然後在處理器handler端的sessionIdle方法中加入一個預關閉連線的方法。讓Session關閉傳遞到監聽器或者攔截器的sessionClose方法中實現重連。

      以攔截器方式為例,在建立客戶端時,加入讀寫通道空閒檢查超時機制。


connector = new NioSocketConnector();  //建立連線客戶端  
        connector.setConnectTimeoutMillis(30000); //設定連線超時  
//      斷線重連回調攔截器  
        connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {  
            @Override  
            public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {  
                for(;;){  
                    try{  
                        Thread.sleep(3000);  
                        ConnectFuture future = connector.connect();  
                        future.awaitUninterruptibly();// 等待連線建立成功  
                        session = future.getSession();// 獲取會話  
                        if(session.isConnected()){  
                            logger.info("斷線重連["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  
                            break;  
                        }  
                    }catch(Exception ex){  
                        logger.info("重連伺服器登入失敗,3秒再連線一次:" + ex.getMessage());  
                    }  
                }  
            }  
        });  
          
        connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());  
        TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());  
        factory.setDecoderMaxLineLength(10240);  
        factory.setEncoderMaxLineLength(10240);  
        //加入解碼器  
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));  
  
        connector.getSessionConfig().setReceiveBufferSize(10240);   // 設定接收緩衝區的大小  
        connector.getSessionConfig().setSendBufferSize(10240);// 設定輸出緩衝區的大小  
          
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000);  //讀寫都空閒時間:30秒  
        connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//讀(接收通道)空閒時間:40秒  
        connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//寫(傳送通道)空閒時間:50秒  
          
        //新增處理器  
                connector.setHandler(new IoHandler());   
          
        connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 設定預設訪問地址  
        for (;;) {  
            try {  
                ConnectFuture future = connector.connect();  
                // 等待連線建立成功  
                future.awaitUninterruptibly();  
                // 獲取會話  
                session = future.getSession();  
                logger.error("連線服務端" + host + ":" + port + "[成功]" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));  
                break;  
            } catch (RuntimeIoException e) {  
                System.out.println("連線服務端" + host + ":" + port + "失敗" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 連線MSG異常,請檢查MSG埠、IP是否正確,MSG服務是否啟動,異常內容:" + e.getMessage());  
                logger.error("連線服務端" + host + ":" + port + "失敗" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 連線MSG異常,請檢查MSG埠、IP是否正確,MSG服務是否啟動,異常內容:" + e.getMessage(), e);  
                Thread.sleep(5000);// 連線失敗後,重連10次,間隔30s  
            }  
        }

然後在資料處理器IoHandler中sessionIdle方法中加入Session會話關閉的程式碼,這樣session關閉就能傳遞到攔截器或者監聽器中,然後實現重連。
import org.apache.mina.core.service.IoHandlerAdapter;  
import org.apache.mina.core.session.IdleStatus;  
import org.apache.mina.core.session.IoSession;  
  
public class IoHandler extends IoHandlerAdapter {  
    //部分程式碼忽略...  
    @Override  
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {  
        logger.info("-客戶端與服務端連線[空閒] - " + status.toString());  
        if(session != null){  
            session.close(true);  
        }  
    }  
    //部分程式碼忽略...  
}

總結-最佳實踐:

       以上兩種方式我個人認為最好是使用第二種。在實際的生產環境,對於資料量比較少的情況下,需要加一個執行緒專門傳送心跳資訊,然後在伺服器端進行迴應心跳,這樣就保證讀寫通道不出現空閒。如果資料量比較大,大到24小時都有資料,那麼就不需要心跳執行緒,可以直接在IoHandler處理器端中messageReceived方法中定時傳送心跳到伺服器。由於讀寫監控還可以處理伺服器、網路、應用等等方面的不確定因素,所以建議使用第二種方式。