1. 程式人生 > >長連線 、短連線、心跳機制與斷線重連(轉載) Socket的長連線和短連線

長連線 、短連線、心跳機制與斷線重連(轉載) Socket的長連線和短連線

概述


可承遇到,不知什麼原因,一個夜晚,機房中,大片的遠端呼叫連線斷開。

第二天早上,使用者訪問高峰,大部分伺服器都在獲取連線,造成大片網路阻塞。

服務崩潰,慘不忍睹的景象。

本文將從長連線和短連線的概念切入,再到長連線與短連線的區別,以及應用場景,引出心跳機制和斷線重連,給出程式碼實現。

從原理到實踐杜絕此類現象。 

 

短連線


概念

client與server通過三次握手建立連線,client傳送請求訊息,server返回響應,一次連線就完成了。

這時候雙方任意都可以發起close操作,不過一般都是client先發起close操作。上述可知,短連線一般只會在 client/server間傳遞一次請求操作。

短連線的優缺點

管理起來比較簡單,存在的連線都是有用的連線,不需要額外的控制手段。

使用場景

通常瀏覽器訪問伺服器的時候就是短連線。

對於服務端來說,長連線會耗費服務端的資源,而且使用者用瀏覽器訪問服務端相對而言不是很頻繁的

如果有幾十萬,上百萬的連線,服務端的壓力會非常大,甚至會崩潰。

所以對於併發量大,請求頻率低的,建議使用短連線。

長連線


什麼是長連線

client向server發起連線,server接受client連線,雙方建立連線。

Client與server完成一次讀寫之後,它們之間的連線並不會主動關閉,後續的讀寫操作會繼續使用這個連線。

長連線的生命週期

正常情況下,一條TCP長連線建立後,只要雙不提出關閉請求並且不出現異常情況,這條連線是一直存在的.

作業系統不會自動去關閉它,甚至經過物理網路拓撲的改變之後仍然可以使用。

所以一條連線保持幾天、幾個月、幾年或者更長時間都有可能,只要不出現異常情況或由使用者(應用層)主動關閉。

客戶端和服務單可一直使用該連線進行資料通訊。

長連線的優點

長連線可以省去較多的TCP建立和關閉的操作,減少網路阻塞的影響,

當發生錯誤時,可以在不關閉連線的情況下進行提示,

減少CPU及記憶體的使用,因為不需要經常的建立及關閉連線。

長連線的缺點

連線數過多時,影響服務端的效能和併發數量。

使用場景

資料庫的連線就是採用TCP長連線.

RPC,遠端服務呼叫,在伺服器,一個服務程序頻繁呼叫另一個服務程序,可使用長連線,減少連線花費的時間。

總結

1.對於長連線和短連線的使用是需要根據應用場景來判斷的

2.長連線並不是萬能的,也是需要維護的,

 

長連線的實現


心跳機制

應用層協議大多都有HeartBeat機制,通常是客戶端每隔一小段時間向伺服器傳送一個數據包,通知伺服器自己仍然線上。

並傳輸一些可能必要的資料。使用心跳包的典型協議是IM,比如QQ/MSN/飛信等協議。

 

在TCP的機制裡面,本身是存在有心跳包的機制的,也就是TCP的選項:SO_KEEPALIVE。

系統預設是設定的2小時的心跳頻率。但是它檢查不到機器斷電、網線拔出、防火牆這些斷線。

而且邏輯層處理斷線可能也不是那麼好處理。一般,如果只是用於保活還是可以的。

為什麼需要心跳機制?

因為網路的不可靠性, 有可能在 TCP 保持長連線的過程中, 由於某些突發情況, 例如網線被拔出, 突然掉電等,

會造成伺服器和客戶端的連線中斷. 在這些突發情況下, 如果恰好伺服器和客戶端之間沒有互動的話, 那麼它們是不能在短時間內發現對方已經掉線的.

心跳機制即可解決此類問題。

TCP協議的KeepAlive機制

預設KeepAlive狀態是不開啟的。

需要將setsockopt將SOL_SOCKET.SO_KEEPALIVE設定為1才是開啟KeepAlive狀態,

並且可以設定三個引數:

tcp_keepalive_time  ,tcp_keepalive_probes  , tcp_keepalive_intvl

分別表示:連線閒置多久開始發keepalive的ack包、發幾個ack包不回覆才當對方已斷線、兩個ack包之間的間隔。

很多網路裝置,尤其是NAT路由器,由於其硬體的限制(例如記憶體、CPU處理能力),無法保持其上的所有連線,因此在必要的時候,會在連線池中選擇一些不活躍的連線踢掉。

典型做法是LRU,把最久沒有資料的連線給T掉。

通過使用TCP的KeepAlive機制(修改那個time引數),可以讓連線每隔一小段時間就產生一些ack包,以降低被踢掉的風險,當然,這樣的代價是額外的網路和CPU負擔。

如何實現心跳機制?

兩種方式實現心跳機制:

  • 使用 TCP 協議層面的 keepalive 機制.

  • 在應用層上實現自定義的心跳機制.

雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:

  1. 它不是 TCP 的標準協議, 並且是預設關閉的.

  2. TCP keepalive 機制依賴於作業系統的實現, 預設的 keepalive 心跳時間是 兩個小時, 並且對 keepalive 的修改需要系統呼叫(或者修改系統配置), 靈活性不夠.

  3. TCP keepalive 與 TCP 協議繫結, 因此如果需要更換為 UDP 協議時, keepalive 機制就失效了.

使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量,

本文的主要介紹應用層方面實現心跳機制,使用netty實現心跳和斷線重連。

netty實現心跳機制


netty對心跳機制提供了機制,實現的關鍵是IdleStateHandler先來看一下他的建構函式

 

 

    public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

 

例項化一個 IdleStateHandler 需要提供三個引數:

  • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到資料時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有資料寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 讀和寫都超時. 即當在指定的時間間隔內沒有讀並且寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.

 

netty心跳流程

 

1. 客戶端成功連線服務端。

2.在客戶端中的ChannelPipeline中加入IdleStateHandler,設定寫事件觸發事件為5s.

3.客戶端超過5s未寫資料,觸發寫事件,向服務端傳送心跳包,

4.同樣,服務端要對心跳包做出響應,其實給客戶端最好的回覆就是“不回覆”,減輕服務端的壓力

5.超過三次,1過0s服務端都會收到來自客戶端的心跳資訊,服務端可以認為客戶端掛了,可以close鏈路。

6.客戶端恢復正常,發現鏈路已斷,重新連線服務端。

程式碼實現

服務端handler:

package com.heartbreak.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Random;

/**
 * @author janti
 * @date 2018/6/10 12:21
 */
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    // 失敗計數器:未收到client端傳送的ping請求
    private int unRecPingTimes = 0;

    // 定義服務端沒有收到心跳訊息的最大次數
    private static final int MAX_UN_REC_PING_TIMES = 3;

    private Random random = new Random(System.currentTimeMillis());

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if (msg!=null && msg.equals("Heartbeat")){
            System.out.println("客戶端"+ctx.channel().remoteAddress()+"--心跳資訊--");
        }else {
            System.out.println("客戶端----請求訊息----:"+msg);
            String resp = "商品的價格是:"+random.nextInt(1000);
            ctx.writeAndFlush(resp);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state()==IdleState.READER_IDLE){
                System.out.println("===服務端===(READER_IDLE 讀超時)");
                // 失敗計數器次數大於等於3次的時候,關閉連結,等待client重連
                if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
                    System.out.println("===服務端===(讀超時,關閉chanel)");
                    // 連續超過N次未收到client的ping訊息,那麼關閉該通道,等待client重連
                    ctx.close();
                } else {
                    // 失敗計數器加1
                    unRecPingTimes++;
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("一個客戶端已連線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.out.println("一個客戶端已斷開連線");
    }
}

服務端server:

package com.heartbreak.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/10 10:46
 */
public class HeartBeatServer {
    private static int port = 9817;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    ServerBootstrap bootstrap = null;
    ChannelFuture f;

    // 檢測chanel是否接受過心跳資料時間間隔(單位秒)
    private static final int READ_WAIT_SECONDS = 10;

    public static void main(String args[]) {
        HeartBeatServer heartBeatServer = new HeartBeatServer(port);
        heartBeatServer.startServer();
    }

    public void startServer() {
        EventLoopGroup bossgroup = new NioEventLoopGroup();
        EventLoopGroup workergroup = new NioEventLoopGroup();
        try {
            bootstrap = new ServerBootstrap();
            bootstrap.group(bossgroup, workergroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HeartBeatServerInitializer());
            // 伺服器繫結埠監聽
            f = bootstrap.bind(port).sync();
            System.out.println("server start ,port: "+port);
            // 監聽伺服器關閉監聽,此方法會阻塞
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossgroup.shutdownGracefully();
            workergroup.shutdownGracefully();
        }
    }


    private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 監聽讀操作,讀超時時間為5秒,超過5秒關閉channel;
            pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            pipeline.addLast("handler", new HeartbeatServerHandler());
        }
    }

}

 客戶端handler:

package com.heartbreak.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/11 22:55
 */
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
    private HeartBeatClient client;

    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    public HeartBeatClientHandler(HeartBeatClient client) {
        this.client = client;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到服務端回覆:"+msg);
        if (msg.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.err.println("客戶端與服務端斷開連線,斷開的時間為:"+format.format(new Date()));
        // 定時執行緒 斷線重連
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                client.doConncet();
            }
        }, 10, TimeUnit.SECONDS);
    }


}

客戶端啟動:

package com.heartbreak.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/10 16:18
 */
public class HeartBeatClient {

    private Random random = new Random();
    public Channel channel;
    public Bootstrap bootstrap;

    protected String host = "127.0.0.1";
    protected int port = 9817;

    public static void main(String args[]) throws Exception {
        HeartBeatClient client = new HeartBeatClient();
        client.run();
        client.sendData();

    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer(HeartBeatClient.this));
            doConncet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 傳送資料
     * @throws Exception
     */
    public void sendData() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String cmd = in.readLine();
            switch (cmd){
                case "close" :
                    channel.close();
                    break;
                default:
                channel.writeAndFlush(in.readLine());
                    break;
            }
        }
    }

    /**
     * 連線服務端
     */
    public void doConncet() {
        if (channel != null && channel.isActive()) {
            return;
        }
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (channelFuture.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("connect server successfully");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConncet();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });

    }


    private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

        private HeartBeatClient client;

        public SimpleClientInitializer(HeartBeatClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 5, 0));
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("handler", new HeartBeatClientHandler(client));
        }
    }


}

執行結果:

1.客戶端長時間未傳送心跳包,服務端關閉連線

server start ,port: 9817
一個客戶端已連線
===服務端===(READER_IDLE 讀超時)
===服務端===(READER_IDLE 讀超時)
===服務端===(READER_IDLE 讀超時)
===服務端===(READER_IDLE 讀超時)
===服務端===(讀超時,關閉chanel)
一個客戶端已斷開連線

2.客戶端傳送心跳包,服務端和客戶端保持心跳資訊

一個客戶端已連線
客戶端/127.0.0.1:55436--心跳資訊--
客戶端/127.0.0.1:55436--心跳資訊--
客戶端/127.0.0.1:55436--心跳資訊--
客戶端/127.0.0.1:55436--心跳資訊--

3.服務單宕機,斷開連線,客戶端進行重連

客戶端與服務端斷開連線,斷開的時間為:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully

 

程式碼地址:

LearnTCP

 本文首發於個人網站:http://www.janti.cn

參考:

TCP長連線與短連線、心跳機制

Socket的長連線和短連線.

淺析 Netty 實現心跳機制與斷線重連

http://tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO

個人部落格網站 http://www.janti.cn

程式碼地址:

LearnTCP

 本文首發於個人網站:http://www.janti.cn

參考:

TCP長連線與短連線、心跳機制

Socket的長連線和短連線.

淺析 Netty 實現心跳機制與斷線重連

http://tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO