Netty — 心跳檢測和斷線重連
一.前言
由於在通信層的網絡連接的不可靠性,比如:網絡閃斷,網絡抖動等,經常會出現連接斷開。這樣對於使用長連接的應用而言,當突然高流量沖擊勢必會造成進行網絡連接,從而產生網絡堵塞,應用響應速度下降,延遲上升,用戶體驗較差。
在通信層的高可用設計中,需要保活長連接的網絡,保證通信能夠正常。一般有兩種設計方式:
- 利用TCP提供的連接保活特性
- 應用層做連接保活
本文主要介紹使用netty時應用層如何做連接保活,提高應用的可用性。
二.TCP連接保活性的局限
TCP協議層面提供了KeepAlive的機制保證連接的活躍,但是其有很多劣勢:
- 該保活機制非TCP協議的標準,默認是關閉
- 該機制依賴操作系統,需要進行系統級配置,不夠靈活方便
- 當應用底層傳輸協議變更時,將無法適用
由於以上的原因,絕大多數的框架、應用處理連接的保活性都是在應用層處理。目前的主流方案是心跳檢測,斷線重連。
三.應用層保證連接的活躍性
1.心跳檢測
心跳檢測機制:客戶端每隔一段時間發送PING消息給服務端,服務端接受到後回復PONG消息。客戶端如果在一定時間內沒有收到PONG響應,則認為連接斷開,服務端如果在一定時間內沒有收到來自客戶端的PING請求,則認為連接已經斷開。通過這種來回的PING-PONG消息機制偵測連接的活躍性。
netty本身也提供了IdleStateHandler用於檢測連接閑置,該Handler可以檢測連接未發生讀寫事件而觸發相應事件。
首先編寫客戶端心跳檢測的Handler:
/** * 心跳檢測: * 1. client發送"PING"消息 * * @author huaijin */ public class ClientHeartBeatHandler extends ChannelHandlerAdapter { /** * PING消息 */ private static final String PING = "0"; /** * PONG消息 */ private static final String PONG = "1"; /** * 分隔符 */ private static final String SPLIT = "$_"; /** * 讀取到服務端響應,如果是PONG響應,則打印。如果是非PONG響應,則傳遞至下一個Handler * * @param ctx 處理上下文 * @param msg 消息 * @throws Exception * @author huaijin */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (PONG.equals(msg)) { System.out.println("from heart bean: " + msg); } else { ctx.fireChannelRead(msg); } } /** * 處理觸發的事件,如果是{@link IdleStateEvent},則判斷是讀或者是寫。如果是du,則斷開連接; * 如果是寫,則發送PING消息 * * @param ctx 處理上下文 * @param evt 事件 * @throws Exception * @author huaijin */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; switch (idleStateEvent.state()) { case WRITER_IDLE: sendPing(ctx); break; case READER_IDLE: System.out.println("client close connection."); closeConnection(ctx); break; case ALL_IDLE: closeConnection(ctx); break; default: break; } } } /** * 發送PING消息 * * @param ctx 上下文 * @author huaijin */ private void sendPing(ChannelHandlerContext ctx) { System.out.println("send heart beat: " + PING); ctx.writeAndFlush(Unpooled.copiedBuffer((PING + SPLIT).getBytes())); } /** * 關閉連接 * * @param ctx * @author huaijin */ private void closeConnection(ChannelHandlerContext ctx) { ctx.disconnect(); ctx.close(); } }
然後再編寫服務單心跳檢測Handler:
/**
* 心跳檢測:
* 1. server端接受到"PING",返回"PONG"消息
*
* @author huaijin
*/
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
/**
* PONG消息
*/
private static final String PONG = "1";
/**
* PING消息
*/
private static final String PING = "0";
/**
* 消息分隔符
*/
private static final String SPLIT = "$_";
/**
* 如果是PING消息,則相應PONG。如果非,則傳遞至下個Handler
*
* @param ctx 上下文
* @param msg 消息
* @throws Exception
* @author huaijin
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (PING.equals(msg)) {
System.out.println("from heart beat: " + msg);
sendPong(ctx);
} else {
ctx.fireChannelRead(msg);
}
}
/**
* 處理觸發事件,如果是讀事件,則關閉連接
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
* @author huaijin
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == READER_IDLE) {
System.out.println("server close connection.");
closeConnection(ctx);
}
}
}
/**
* 發送PONG消息
*
* @param ctx 上下文
* @author huaijin
*/
private void sendPong(ChannelHandlerContext ctx) {
System.out.println("send heart bean: " + PONG);
ctx.writeAndFlush(Unpooled.copiedBuffer((PONG + SPLIT).getBytes()));
}
/**
* 關閉連接
*
* @param ctx 上下文
* @author huaijin
*/
private void closeConnection(ChannelHandlerContext ctx) {
ctx.disconnect();
ctx.close();
}
}
通過以上的ClientHeartbeatHandler和ServerHeartBeatHandler和netty本身提供的IdleStateHandler能夠完成心跳檢測。
Note:
但是IdleStateHandler中有未讀和未寫的事件設置,這裏需要非常著重註意。客戶端的為讀時間最好設置為服務端的未寫時間的兩倍,服務端的未讀時間最好設置為客戶端的未寫時間的兩倍。
2.斷線重連
當心跳檢測發現連接斷開後,為了保證通信層的可用性,仍然需要重新連接,保證通信的可靠。對於短線重連一般有兩種設計方式比較常見:
- 通過額外的線程定時輪循所有的連接的活躍性,如果發現其中有死連接,則執行重連
- 監聽連接上發送的斷開事件,如果發送則執行重連操作
這裏我們首先看下第一種實現方式,netty中當Bootstrap執行connect操作後,會獲得ChannelFuture對象,在該對象上執行close事件的監聽,如果發生了close則提交重連操作。
public void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new IdleStateHandler(
10, 5, 10));
ch.pipeline().addLast(new ClientHeartBeatHandler());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
// 監聽channel上的close事件
f.channel().closeFuture().sync();
} finally {
// 提交重連操作
executor.execute(() -> {
try {
System.out.println("reconnection to: " + "127.0.0.1:8080");
connect(8080, "127.0.0.1");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect(8080, "127.0.0.1");
Thread.currentThread().join();
}
但是該種方式對於應用而言,需要每個連接都有重連的線程,這樣對於資源消耗比較大。建議采用第二種情況,使用額外的單線程輪循所有的連接,檢測其是否活躍。該種方式在開源框架中有應用。
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
reconnect();
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
使用Java的定時線程池,定時執行重連操作。在重連操作中將檢測連接的活躍性,如果非活躍,則執行重連。
參考
fescar中心跳和重連的源碼
淺析 Netty 實現心跳機制與斷線重連
Netty權威指南-心跳檢測機制和斷連重連
Netty — 心跳檢測和斷線重連