1. 程式人生 > >Mina 斷線重連機制

Mina 斷線重連機制

Mina 斷線重連

    定義:這裡討論的Mina 斷線重連是指使用mina作為客戶端軟體,連線其他提供Socket通訊服務的伺服器端。Socket伺服器可以是Mina提供的伺服器,也可以是C++提供的伺服器。



    

一、斷線重連的方式;

    1. 在建立Mina客戶端時增加一個監聽器,或者增加一個攔截器,當檢測到Session關閉時,自動進行重連。


    

    2. 在第1種方式的基礎上,增加客戶端的讀寫通道空閒檢查,當發生Session關閉或者讀寫空閒時,進行重連。


    

        第一種方式比較傳統,優點是簡單方便,適合網路穩定、資料量不大(1M頻寬以下)的環境;不過缺點是不能對系統級的連線斷開阻塞進行捕獲。

        第二種方式更加精細,基本上能捕獲到應用、網路、系統級的斷連。

二、重連目的:

        在使用Mina做為客戶端時,往往因為網路、伺服器、應用程式出現問題而導致連線斷開,而自動重連,就是解決連線斷開的唯一方式。如果網線斷開、伺服器宕機、應用程式掛了,都是斷線的原因,這個時候,通過增加一個監聽器或者攔截器,就能實現重連。但是生產環境中,斷線的原因可能更復雜:網路不穩定、延時、伺服器負載高、伺服器或者應用程式的傳送或者接收緩衝區滿等等問題都可能導致資料傳輸過程出現類似於斷線的情況,這個時候,光檢測Session關閉是遠遠不夠的,這個時候就需要一種重連機制,比如讀寫空閒超過30秒,就進行重連。對於資料不間斷、實時性高、資料量大的應用場景,更是實用。

三、例項:

    第一種:監聽器方式

       建立一個監聽器實現mina的IoServiceListener介面,裡面的方法可以不用寫實現

Java程式碼  收藏程式碼
  1. <span style="font-family: 'Microsoft YaHei',微軟雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">import org.apache.mina.core.service.IoService;  
  2. import org.apache.mina.core.service.IoServiceListener;  
  3. import
     org.apache.mina.core.session.IdleStatus;  
  4. import org.apache.mina.core.session.IoSession;  
  5. public class IoListener implements IoServiceListener{  
  6.     @Override  
  7.     public void serviceActivated(IoService arg0) throws Exception {  
  8.         // TODO Auto-generated method stub  
  9.     }  
  10.     @Override  
  11.     public void serviceDeactivated(IoService arg0) throws Exception {  
  12.         // TODO Auto-generated method stub  
  13.     }  
  14.     @Override  
  15.     public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {  
  16.         // TODO Auto-generated method stub  
  17.     }  
  18.     @Override  
  19.     public void sessionCreated(IoSession arg0) throws Exception {  
  20.         // TODO Auto-generated method stub  
  21.     }  
  22.     @Override  
  23.     public void sessionDestroyed(IoSession arg0) throws Exception {  
  24.         // TODO Auto-generated method stub  
  25.     }  
  26. }</span>  

再建立客戶端時加入監聽

Java程式碼  收藏程式碼
  1. <span style="font-family: 'Microsoft YaHei',微軟雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">        NioSocketConnector connector = new NioSocketConnector();  //建立連線客戶端  
  2.         connector.setConnectTimeoutMillis(30000); //設定連線超時  
  3.         connector.getSessionConfig().setReceiveBufferSize(10240);   // 設定接收緩衝區的大小  
  4.         connector.getSessionConfig().setSendBufferSize(10240);// 設定輸出緩衝區的大小  
  5. //      加入解碼器  
  6.         TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("GBK"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());  
  7.         factory.setDecoderMaxLineLength(10240);  
  8.         factory.setEncoderMaxLineLength(10240);  
  9.         connector.getFilterChain().addLast("codec"new ProtocolCodecFilter(factory));  
  10.         connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 設定預設訪問地址  
  11.         //新增處理器  
  12.                 connector.setHandler(new IoHandler());   
  13.                 // 新增重連監聽  
  14.         connector.addListener(new IoListener() {  
  15.             @Override  
  16.             public void sessionDestroyed(IoSession arg0) throws Exception {  
  17.                 for (;;) {  
  18.                     try {  
  19.                         Thread.sleep(3000);  
  20.                         ConnectFuture future = connector.connect();  
  21.                         future.awaitUninterruptibly();// 等待連線建立成功  
  22.                         session = future.getSession();// 獲取會話  
  23.                         if (session.isConnected()) {  
  24.                             logger.info("斷線重連[" + connector.getDefaultRemoteAddress().getHostName() + ":" + connector.getDefaultRemoteAddress().getPort() + "]成功");  
  25.                             break;  
  26.                         }  
  27.                     } catch (Exception ex) {  
  28.                         logger.info("重連伺服器登入失敗,3秒再連線一次:" + ex.getMessage());  
  29.                     }  
  30.                 }  
  31.             }  
  32.         });  
  33.                 for (;;) {  
  34.             try {  
  35.                 ConnectFuture future = connector.connect();  
  36.                 future.awaitUninterruptibly(); // 等待連線建立成功    
  37.                         session = future.getSession(); // 獲取會話     
  38.                 logger.info("連線服務端" + host + ":" + port + "[成功]" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));  
  39.                 break;  
  40.             } catch (RuntimeIoException e) {  
  41.                 logger.error("連線服務端" + host + ":" + port + "失敗" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 連線MSG異常,請檢查MSG埠、IP是否正確,MSG服務是否啟動,異常內容:" + e.getMessage(), e);  
  42.                 Thread.sleep(5000);// 連線失敗後,重連間隔5s  
  43.             }  
  44.         }  
  45. </span>  

    第一種:攔截器方式

Java程式碼  收藏程式碼
  1. <span style="font-family: 'Microsoft YaHei',微軟雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">        connector = new NioSocketConnector();  //建立連線客戶端  
  2.         connector.setConnectTimeoutMillis(30000); //設定連線超時  
  3. //      斷線重連回調攔截器  
  4.         connector.getFilterChain().addFirst("reconnection"new IoFilterAdapter() {  
  5.             @Override  
  6.             public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {  
  7.                 for(;;){  
  8.                     try{  
  9.                         Thread.sleep(3000);  
  10.                         ConnectFuture future = connector.connect();  
  11.                         future.awaitUninterruptibly();// 等待連線建立成功  
  12.                         session = future.getSession();// 獲取會話  
  13.                         if(session.isConnected()){  
  14.                             logger.info("斷線重連["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  
  15.                             break;  
  16.                         }  
  17.                     }catch(Exception ex){  
  18.                         logger.info("重連伺服器登入失敗,3秒再連線一次:" + ex.getMessage());  
  19.                     }  
  20.                 }  
  21.             }  
  22.         });  
  23.         TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());  
  24.         factory.setDecoderMaxLineLength(10240);  
  25.         factory.setEncoderMaxLineLength(10240);  
  26.         //加入解碼器  
  27.         connector.getFilterChain().addLast("codec"new ProtocolCodecFilter(factory));  
  28.         //新增處理器  
  29.                 connector.setHandler(new IoHandler());  
  30.         connector.getSessionConfig().setReceiveBufferSize(10240);   // 設定接收緩衝區的大小  
  31.         connector.getSessionConfig().setSendBufferSize(10240);          // 設定輸出緩衝區的大小  
  32.         connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// 設定預設訪問地址  
  33.         for (;;) {  
  34.             try {  
  35.                 ConnectFuture future = connector.connect();  
  36.                 // 等待連線建立成功  
  37.                 future.awaitUninterruptibly();  
  38.                 // 獲取會話  
  39.                 session = future.getSession();  
  40.                 logger.error("連線服務端" + host + ":" + port + "[成功]" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));  
  41.                 break;  
  42.             } catch (RuntimeIoException e) {  
  43.                 logger.error("連線服務端" + host + ":" + port + "失敗" + ",,時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 連線MSG異常,請檢查MSG埠、IP是否正確,MSG服務是否啟動,異常內容:" + e.getMessage(), e);  
  44.                 Thread.sleep(5000);// 連線失敗後,重連間隔5s  
  45.             }  
  46.         }</span>  

第二種:加入空閒檢測機制

        空閒檢測機制需要在建立客戶端時,加入空閒超時,然後在處理器handler端的sessionIdle方法中加入一個預關閉連線的方法。讓Session關閉傳遞到監聽器或者攔截器的sessionClose方法中實現重連。

      以攔截器方式為例,在建立客戶端時,加入讀寫通道空閒檢查超時機制。

Java程式碼  收藏程式碼
  1. <span style="font-family: 'Microsoft YaHei',微軟雅黑,SimHei,tahoma,arial,helvetica,sans-serif;">        connector = new NioSocketConnector();  //建立連線客戶端  
  2.         connector.setConnectTimeoutMillis(30000); //設定連線超時  
  3. //      斷線重連回調攔截器  
  4.         connector.getFilterChain().addFirst("reconnection"new IoFilterAdapter() {  
  5.             @Override  
  6.             public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {  
  7.                 for(;;){  
  8.                     try{  
  9.                         Thread.sleep(3000);  
  10.                         ConnectFuture future = connector.connect();  
  11.                         future.awaitUninterruptibly();// 等待連線建立成功  
  12.                         session = future.getSession();// 獲取會話  
  13.                         if(session.isConnected()){  
  14.                             logger.info("斷線重連["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  
  15.                             break;  
  16.                         }  
  17.                     }catch(Exception ex){  
  18.                         logger.info("重連伺服器登入失敗,3秒再連線一次:" + ex.getMessage());  
  19.                     }  
  20.                 }  
  21.             }  
  22.         });  
  23.         connector.getFilterChain().addLast("mdc"new MdcInjectionFilter());  
  24.         TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());  
  25.         factory.setDecoderMaxLineLength(10240);  
  26.         factory.setEncoderMaxLineLength(10240);  
  27.         //加入解碼器  
  28.         connector.getFilterChain().addLast("codec"new ProtocolCodecFilter(factory));  
  29.         connector.getSessionConfig().setReceiveBufferSize(10240);   // 設定接收緩衝區的大小  
  30.         connector.getSessionConfig().setSendBufferSize(10240);// 設定輸出緩衝區的大小  
  31.         connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000);  //讀寫都空閒時間:30秒  
  32.         connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//讀(接收通道)空閒時間:40秒  
  33.         connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//寫(傳送通道)空閒時間:50秒  
  34.         

    相關推薦

    Mina 機制

    Mina 斷線重連     定義:這裡討論的Mina 斷線重連是指使用mina作為客戶端軟體,連線其他提供Socket通訊服務的伺服器端。Socket伺服器可以是Mina提供的伺服器,也可以是C++提供的伺服器。      一、斷線重連的方式;     1.

    ActiveMQ的機制

    primary active 節點 語法 無限 機制 新的 bubuko 獲取 斷線重連機制是ActiveMQ的高可用性具體體現之一。ActiveMQ提供failover機制去實現斷線重連的高可用性,可以使得連接斷開之後,不斷的重試連接到一個或多個brokerURL。 默認

    即時通訊判斷網路狀態和機制

    本文借鑑csdn大神way的xmpp客戶端學習改造而來,不足之處希望大家多多指教!  1. 由於近半年來一直寫針對於tigase伺服器的即時通訊軟體的開發,框架的重構,對即時通訊的理解也較之前更進一步,在客戶端的IM開發中,最重要的除去通訊的建立,就是保持網路環境不斷更換時

    【android學習】機制

    【解決問題】 android端連線網路之後,當網路斷開連線時,為了提高使用者體驗,android自動檢測網路,當有網路時,使用者無需進行多餘操作,android端自動重新連線網路。 【解決方案】 1、設定網路斷開連線標誌:NET_BAD 1)每次傳送資料,若傳送資料不成功,

    長連線 、短連線、心跳機制(轉載) Socket的長連線和短連線

    概述 可承遇到,不知什麼原因,一個夜晚,機房中,大片的遠端呼叫連線斷開。 第二天早上,使用者訪問高峰,大部分伺服器都在獲取連線,造成大片網路阻塞。 服務崩潰,慘不忍睹的景象。 本文將從長連線和短連線的概念切入,再到長連線與短連線的區別,以及應用場景,引出心跳機制和斷線重連,給出程式碼實現。 從原

    接 、短接、心跳機制(轉載)

    http ase 地址 出現異常 好處 失效 能力 shutdown 根據 概述 可承遇到,不知什麽原因,一個夜晚,機房中,大片的遠程調用連接斷開。 第二天早上,用戶訪問高峰,大部分服務器都在獲取連接,造成大片網絡阻塞。 服務崩潰,慘不忍睹的景象。 本文將從長連接和短連接

    Apache mina 入門(四) —— 客戶端長連線方式實現監聽

    通過前面 Apache Mina 入門 (二)—— 非同步通訊機制 我們可以實現一個長連線的客戶端。但會發現一個問題,就是當網路、伺服器、應用程式出現問題而導致連線斷開後,我們的客戶端不能自動重連伺服器。導致客戶端程式癱瘓,不能使用。這個時候,通過增加一

    mina作為客戶端

    定義:這裡討論的Mina 斷線重連是指使用mina作為客戶端軟體,連線其他提供Socket通訊服務的伺服器端。Socket伺服器可以是Mina提供的伺服器,也可以是C++提供的伺服器。      一、斷線重連的方式;     1. 在建立Mina客戶端時增加一個

    nodejs中mysql

    調試 狀態 pan var 參考 ble prot nec clas 之前寫了個小程序Node News,用到了MySQL數據庫,在本地測試均沒神馬問題。放上服務器運行一段時間後,偶然發現打開頁面的時候頁面一直處於等待狀態,直到Nginx返回超時錯誤。於是上服務器檢查了遍,

    android 實現mqtt訊息推送,以及不停的問題解決

    前段時間專案用到mqtt的訊息推送,整理一下程式碼,程式碼的原型是網上找的,具體哪個地址已經忘記了。 程式碼的實現是新建了一個MyMqttService,全部功能都在裡面實現,包括連伺服器,斷線重連,訂閱訊息,處理訊息,釋出訊息等基本操作。 首先新增依賴: dependencies { &

    django的資料庫

    django每次查詢都會被把連線關閉,想保持長連線方法如下: from django.core import signals from django.db import close_connection # 取消訊號關聯,實現資料庫長連線 signals.request

    【UE4】 第12講 FSocket

    (版權宣告,禁止轉載) 【第03講】 實現了FSocket的連網基礎功能,這一講實現一下 斷線重連 <如果資深前輩發現有理解錯誤,還請不吝指正> <1> 建立Socket,設定阻塞模式(預設創建出來的就是阻塞模式,不用設定)  

    teamtalk socket問題的查詢

    之前從teamtalk的核心庫裡面剝離出一個跨平臺網路庫,一開始用的好好的,可是在某些地方使用的時候總是出怪問題,有時候斷線重連就一直連不上,導致應用失聯,在實際使用場景中一直出問題,好不尷尬。 經過連三天的苦思冥想,除錯程式碼看輸出終於有些眉目了。 我是從以下幾個方面著手解決的,特此記錄一

    關於資料庫的一點點思考

    最近在寫資料庫連結池,一個不可逃避的問題就是資料庫斷線重連。 查了很多資料,因為公司有很多專案用了 TP5 於是也去看了它的原始碼。 tp5的實現其實很簡單,配置了一些資料庫連線相關的錯誤資訊關鍵詞(句),然後在執行語句時 catch 異常資訊進行比對: // 伺服器斷線標識字元 p

    Yii2實現mysql[轉載]

    最近遇到“Yii2實現mysql斷線重連”問題,找了好久資料,最後找到這篇文件是說明了該情況的,感謝這位作者的分享,記錄下來,必備以後查閱。 原文連結:https://www.yiichina.com/topic/7296 Yii2實現資料庫斷線重連 一、前話 在工作中,有時候一

    netty4.0 心跳檢測與操作

    因為最近專案最近要用netty,服務端放在雲端,客戶端發在內網。那如何實現netty長連線和斷線重連呢(網路故障或者其他原因,客戶端要無限取重連服務端)。接下來我們看一下如何實現這個兩個功能呢。 服務端程式碼如下: package com.example.nettydem

    mqtt協議 springboot2.0.4 mqttv3 釋出訂閱程式碼呼叫,mqtt

    mqttv3 釋出訂閱程式碼呼叫 我用的是springboot2.0.4 直接上程式碼: pom.xml <dependency> <groupId>org.eclipse.paho</groupId>

    iOS MQTT使用案例 ()

    iOS MQTT使用案例 (斷線重連) 參考了 iOS MQTT—-MQTTClient實戰-看這篇的就夠了 大神寫的這篇 git: MQTT-Client-Framework 介紹啥的看百度,上面大神寫的就行了,直接上乾貨。 安裝: pod 'MQTTClient'

    Node.js連線RabbitMQ,,動態繫結routing key

    RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基於回撥的。 下面將給出基於Promise式的寫法。並且實現動態的佇列繫結 初始化配置 const amqp = require('amqplib') // rabbitMQ地址 const {amqpA

    WebSocket在服務端和客戶端通訊demo,支援心跳檢測+

    一、為什麼需要 WebSocket? 初次接觸 WebSocket 的人,都會問同樣的問題:我們已經有了 HTTP 協議,為什麼還需要另一個協議?它能帶來什麼好處? 答案很簡單,因為 HTTP 協議有一個缺陷:通訊只能由客戶端發起。 舉例來說,我們想了解今天的天氣,只能是客戶端向伺服器發出