1. 程式人生 > >長連接 、短連接、心跳機制與斷線重連(轉載)

長連接 、短連接、心跳機制與斷線重連(轉載)

http ase 地址 出現異常 好處 失效 能力 shutdown 根據

概述


可承遇到,不知什麽原因,一個夜晚,機房中,大片的遠程調用連接斷開。

第二天早上,用戶訪問高峰,大部分服務器都在獲取連接,造成大片網絡阻塞。

服務崩潰,慘不忍睹的景象。

本文將從長連接和短連接的概念切入,再到長連接與短連接的區別,以及應用場景,引出心跳機制和斷線重連,給出代碼實現。

從原理到實踐杜絕此類現象。

短連接


概念

client與server通過三次握手建立連接,client發送請求消息,server返回響應,一次連接就完成了。

這時候雙方任意都可以發起close操作,不過一般都是client先發起close操作。上述可知,短連接一般只會在 client/server間傳遞一次請求操作。

短連接的優缺點

管理起來比較簡單,存在的連接都是有用的連接,不需要額外的控制手段。

使用場景

通常瀏覽器訪問服務器的時候就是短連接。

對於服務端來說,長連接會耗費服務端的資源,而且用戶用瀏覽器訪問服務端相對而言不是很頻繁的

如果有幾十萬,上百萬的連接,服務端的壓力會非常大,甚至會崩潰。

所以對於並發量大,請求頻率低的,建議使用短連接。

長連接


什麽是長連接

client向server發起連接,server接受client連接,雙方建立連接。

Client與server完成一次讀寫之後,它們之間的連接並不會主動關閉,後續的讀寫操作會繼續使用這個連接。

長連接的生命周期

正常情況下,一條TCP長連接建立後,只要雙不提出關閉請求並且不出現異常情況,這條連接是一直存在的.

操作系統不會自動去關閉它,甚至經過物理網絡拓撲的改變之後仍然可以使用。

所以一條連接保持幾天、幾個月、幾年或者更長時間都有可能,只要不出現異常情況或由用戶(應用層)主動關閉。

客戶端和服務單可一直使用該連接進行數據通信。

長連接的優點

長連接可以省去較多的TCP建立和關閉的操作,減少網絡阻塞的影響,

當發生錯誤時,可以在不關閉連接的情況下進行提示,

減少CPU及內存的使用,因為不需要經常的建立及關閉連接。

長連接的缺點

連接數過多時,影響服務端的性能和並發數量。

使用場景

數據庫的連接就是采用TCP長連接.

RPC,遠程服務調用,在服務器,一個服務進程頻繁調用另一個服務進程,可使用長連接,減少連接花費的時間。

總結

1.對於長連接和短連接的使用是需要根據應用場景來判斷的

2.長連接並不是萬能的,也是需要維護的,

長連接的實現


心跳機制

應用層協議大多都有HeartBeat機制,通常是客戶端每隔一小段時間向服務器發送一個數據包,通知服務器自己仍然在線。

並傳輸一些可能必要的數據。使用心跳包的典型協議是IM,比如QQ/MSN/飛信等協議。

在TCP的機制裏面,本身是存在有心跳包的機制的,也就是TCP的選項:SO_KEEPALIVE。

系統默認是設置的2小時的心跳頻率。但是它檢查不到機器斷電、網線拔出、防火墻這些斷線。

而且邏輯層處理斷線可能也不是那麽好處理。一般,如果只是用於保活還是可以的。

為什麽需要心跳機制?

因為網絡的不可靠性, 有可能在 TCP 保持長連接的過程中, 由於某些突發情況, 例如網線被拔出, 突然掉電等,

會造成服務器和客戶端的連接中斷. 在這些突發情況下, 如果恰好服務器和客戶端之間沒有交互的話, 那麽它們是不能在短時間內發現對方已經掉線的.

心跳機制即可解決此類問題。

TCP協議的KeepAlive機制

默認KeepAlive狀態是不打開的。

需要將setsockopt將SOL_SOCKET.SO_KEEPALIVE設置為1才是打開KeepAlive狀態,

並且可以設置三個參數:

tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl

分別表示:連接閑置多久開始發keepalive的ack包、發幾個ack包不回復才當對方已斷線、兩個ack包之間的間隔。

很多網絡設備,尤其是NAT路由器,由於其硬件的限制(例如內存、CPU處理能力),無法保持其上的所有連接,因此在必要的時候,會在連接池中選擇一些不活躍的連接踢掉。

典型做法是LRU,把最久沒有數據的連接給T掉。

通過使用TCP的KeepAlive機制(修改那個time參數),可以讓連接每隔一小段時間就產生一些ack包,以降低被踢掉的風險,當然,這樣的代價是額外的網絡和CPU負擔。

如何實現心跳機制?

兩種方式實現心跳機制:

  • 使用 TCP 協議層面的 keepalive 機制.

  • 在應用層上實現自定義的心跳機制.

雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:

  1. 它不是 TCP 的標準協議, 並且是默認關閉的.

  2. TCP keepalive 機制依賴於操作系統的實現, 默認的 keepalive 心跳時間是 兩個小時, 並且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.

  3. TCP keepalive 與 TCP 協議綁定, 因此如果需要更換為 UDP 協議時, keepalive 機制就失效了.

使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量,

本文的主要介紹應用層方面實現心跳機制,使用netty實現心跳和斷線重連。

netty實現心跳機制


netty對心跳機制提供了機制,實現的關鍵是IdleStateHandler先來看一下他的構造函數

    public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

實例化一個 IdleStateHandler 需要提供三個參數:

  • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 讀和寫都超時. 即當在指定的時間間隔內沒有讀並且寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.

netty心跳流程

技術分享圖片

1. 客戶端成功連接服務端。

2.在客戶端中的ChannelPipeline中加入IdleStateHandler,設置寫事件觸發事件為5s.

3.客戶端超過5s未寫數據,觸發寫事件,向服務端發送心跳包,

4.同樣,服務端要對心跳包做出響應,其實給客戶端最好的回復就是“不回復”,減輕服務端的壓力

5.超過三次,1過0s服務端都會收到來自客戶端的心跳信息,服務端可以認為客戶端掛了,可以close鏈路。

6.客戶端恢復正常,發現鏈路已斷,重新連接服務端。

代碼實現

服務端handler:

package com.heartbreak.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Random;

/**
 * @author janti
 * @date 2018/6/10 12:21
 */
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    // 失敗計數器:未收到client端發送的ping請求
    private int unRecPingTimes = 0;

    // 定義服務端沒有收到心跳消息的最大次數
    private static final int MAX_UN_REC_PING_TIMES = 3;

    private Random random = new Random(System.currentTimeMillis());

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if (msg!=null && msg.equals("Heartbeat")){
            System.out.println("客戶端"+ctx.channel().remoteAddress()+"--心跳信息--");
        }else {
            System.out.println("客戶端----請求消息----:"+msg);
            String resp = "商品的價格是:"+random.nextInt(1000);
            ctx.writeAndFlush(resp);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state()==IdleState.READER_IDLE){
                System.out.println("===服務端===(READER_IDLE 讀超時)");
                // 失敗計數器次數大於等於3次的時候,關閉鏈接,等待client重連
                if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
                    System.out.println("===服務端===(讀超時,關閉chanel)");
                    // 連續超過N次未收到client的ping消息,那麽關閉該通道,等待client重連
                    ctx.close();
                } else {
                    // 失敗計數器加1
                    unRecPingTimes++;
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("一個客戶端已連接");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.out.println("一個客戶端已斷開連接");
    }
}

服務端server:

package com.heartbreak.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/10 10:46
 */
public class HeartBeatServer {
    private static int port = 9817;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    ServerBootstrap bootstrap = null;
    ChannelFuture f;

    // 檢測chanel是否接受過心跳數據時間間隔(單位秒)
    private static final int READ_WAIT_SECONDS = 10;

    public static void main(String args[]) {
        HeartBeatServer heartBeatServer = new HeartBeatServer(port);
        heartBeatServer.startServer();
    }

    public void startServer() {
        EventLoopGroup bossgroup = new NioEventLoopGroup();
        EventLoopGroup workergroup = new NioEventLoopGroup();
        try {
            bootstrap = new ServerBootstrap();
            bootstrap.group(bossgroup, workergroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HeartBeatServerInitializer());
            // 服務器綁定端口監聽
            f = bootstrap.bind(port).sync();
            System.out.println("server start ,port: "+port);
            // 監聽服務器關閉監聽,此方法會阻塞
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossgroup.shutdownGracefully();
            workergroup.shutdownGracefully();
        }
    }


    private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 監聽讀操作,讀超時時間為5秒,超過5秒關閉channel;
            pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            pipeline.addLast("handler", new HeartbeatServerHandler());
        }
    }

}

客戶端handler:

package com.heartbreak.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/11 22:55
 */
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
    private HeartBeatClient client;

    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    public HeartBeatClientHandler(HeartBeatClient client) {
        this.client = client;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到服務端回復:"+msg);
        if (msg.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.err.println("客戶端與服務端斷開連接,斷開的時間為:"+format.format(new Date()));
        // 定時線程 斷線重連
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                client.doConncet();
            }
        }, 10, TimeUnit.SECONDS);
    }


}

客戶端啟動:

package com.heartbreak.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/10 16:18
 */
public class HeartBeatClient {

    private Random random = new Random();
    public Channel channel;
    public Bootstrap bootstrap;

    protected String host = "127.0.0.1";
    protected int port = 9817;

    public static void main(String args[]) throws Exception {
        HeartBeatClient client = new HeartBeatClient();
        client.run();
        client.sendData();

    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer(HeartBeatClient.this));
            doConncet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 發送數據
     * @throws Exception
     */
    public void sendData() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String cmd = in.readLine();
            switch (cmd){
                case "close" :
                    channel.close();
                    break;
                default:
                channel.writeAndFlush(in.readLine());
                    break;
            }
        }
    }

    /**
     * 連接服務端
     */
    public void doConncet() {
        if (channel != null && channel.isActive()) {
            return;
        }
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (channelFuture.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("connect server successfully");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConncet();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });

    }


    private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

        private HeartBeatClient client;

        public SimpleClientInitializer(HeartBeatClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 5, 0));
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("handler", new HeartBeatClientHandler(client));
        }
    }


}

運行結果:

1.客戶端長時間未發送心跳包,服務端關閉連接

server start ,port: 9817
一個客戶端已連接
===服務端===(READER_IDLE 讀超時)
===服務端===(READER_IDLE 讀超時)
===服務端===(READER_IDLE 讀超時)
===服務端===(READER_IDLE 讀超時)
===服務端===(讀超時,關閉chanel)
一個客戶端已斷開連接

2.客戶端發送心跳包,服務端和客戶端保持心跳信息

一個客戶端已連接
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--

3.服務單宕機,斷開連接,客戶端進行重連

客戶端與服務端斷開連接,斷開的時間為:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully

代碼地址:

LearnTCP

本文首發於個人網站:http://www.janti.cn

參考:

TCP長連接與短連接、心跳機制

Socket的長連接和短連接.

淺析 Netty 實現心跳機制與斷線重連

http://tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO

長連接 、短連接、心跳機制與斷線重連(轉載)