1. 程式人生 > >一起學Netty(十四)之 Netty生產級的心跳和重連機制

一起學Netty(十四)之 Netty生產級的心跳和重連機制

sigh,寫這篇部落格的時候老臉還是紅了一下,心裡還是有些唏噓的,應該算是剽竊吧,每個人的程式碼功力的確是有差距的,好在文章的標題是“一起學”,而不是開濤大神的“跟我學”系列的文章,我們還是多花點時間學習吧,感嘆無用~

最近工作比較忙,但閒暇之餘還是看了阿里的馮家春(fengjiachun)的github上的開原始碼Jupiter,寫的RPC框架讓我感嘆人外有人,廢話不多說,下面的程式碼全部擷取自Jupiter,寫了一個比較完整的例子,供大家一起學習分享,再次對@Luca抱拳,Jupiter的Github地址:

https://github.com/fengjiachun/Jupiter

今天研究的是,心跳和重連,雖然這次是大神寫的程式碼,但是萬變不離其宗,我們先回顧一下Netty應用心跳和重連的整個過程:

1)客戶端連線服務端

2)在客戶端的的ChannelPipeline中加入一個比較特殊的IdleStateHandler,設定一下客戶端的寫空閒時間,例如5s

3)當客戶端的所有ChannelHandler中4s內沒有write事件,則會觸發userEventTriggered方法(上文介紹過)

4)我們在客戶端的userEventTriggered中對應的觸發事件下發送一個心跳包給服務端,檢測服務端是否還存活,防止服務端已經宕機,客戶端還不知道

5)同樣,服務端要對心跳包做出響應,其實給客戶端最好的回覆就是“不回覆”,這樣可以服務端的壓力,假如有10w個空閒Idle的連線,那麼服務端光傳送心跳回復,則也是費事的事情,那麼怎麼才能告訴客戶端它還活著呢,其實很簡單,因為5s服務端都會收到來自客戶端的心跳資訊,那麼如果10秒內收不到,服務端可以認為客戶端掛了,可以close鏈路

6)加入服務端因為什麼因素導致宕機的話,就會關閉所有的鏈路連結,所以作為客戶端要做的事情就是短線重連

以上描述的就是整個心跳和重連的整個過程,雖然很簡單,上一篇blog也寫了一個Demo,簡單地做了一下上述功能

要寫工業級的Netty心跳重連的程式碼,需要解決一下幾個問題:

1)ChannelPipeline中的ChannelHandlers的維護,首次連線和重連都需要對ChannelHandlers進行管理

2)重連物件的管理,也就是bootstrap物件的管理

3)重連機制編寫

完整的程式碼:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle

下面我們就看大神是如何解決這些問題的,首先先定義一個介面ChannelHandlerHolder,用來保管ChannelPipeline中的Handlers的

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;

/**
 * 
 * 客戶端的ChannelHandler集合,由子類實現,這樣做的好處:
 * 繼承這個介面的所有子類可以很方便地獲取ChannelPipeline中的Handlers
 * 獲取到handlers之後方便ChannelPipeline中的handler的初始化和在重連的時候也能很方便
 * 地獲取所有的handlers
 */
public interface ChannelHandlerHolder {

    ChannelHandler[] handlers();
}
我們再來編寫我們熟悉的服務端的ServerBootstrap的編寫:

HeartBeatServer.java

package com.lyncc.netty.idle;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class HeartBeatServer {
    
    private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
    
    private int port;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                    .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                            ch.pipeline().addLast(idleStateTrigger);
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            ch.pipeline().addLast(new HeartBeatServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
            // 繫結埠,開始接收進來的連線
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new HeartBeatServer(port).start();
    }

}
單獨寫一個AcceptorIdleStateTrigger,其實也是繼承ChannelInboundHandlerAdapter,重寫userEventTriggered方法,因為客戶端是write,那麼服務端自然是read,設定的狀態就是IdleState.READER_IDLE,原始碼如下:
package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;


@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                throw new Exception("idle exception");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
HeartBeatServerHandler就是一個很簡單的自定義的Handler,不是重點:
package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead..");
        System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

接下來就是重點,我們需要寫一個類,這個類可以去觀察鏈路是否斷了,如果斷了,進行迴圈的斷線重連操作,ConnectionWatchdog,顧名思義,鏈路檢測狗,我們先看完整程式碼:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

/**
 * 
 * 重連檢測狗,當發現當前的鏈路不穩定關閉之後,進行12次重連
 */
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
    
    
    
    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;
    
    private final String host;

    private volatile boolean reconnect = true;
    private int attempts;
    
    
    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }
    
    /**
     * channel鏈路每次active的時候,將其連線的次數重新☞ 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        
        System.out.println("當前鏈路已經激活了,重連嘗試次數重新置為0");
        
        attempts = 0;
        ctx.fireChannelActive();
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連結關閉");
        if(reconnect){
            System.out.println("連結關閉,將進行重連");
            if (attempts < 12) {
                attempts++;
                //重連的間隔時間會越來越長
                int timeout = 2 << attempts;
                timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
            }
        }
        ctx.fireChannelInactive();
    }
    

    public void run(Timeout timeout) throws Exception {
        
        ChannelFuture future;
        //bootstrap已經初始化好了,只需要將handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host,port);
        }
        //future物件
        future.addListener(new ChannelFutureListener() {

            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();

                //如果重連失敗,則呼叫ChannelInactive方法,再次出發重連事件,一直嘗試12次,如果失敗則不再重連
                if (!succeed) {
                    System.out.println("重連失敗");
                    f.channel().pipeline().fireChannelInactive();
                }else{
                    System.out.println("重連成功");
                }
            }
        });
        
    }

}


稍微分析一下:

1)繼承了ChannelInboundHandlerAdapter,說明它也是Handler,也對,作為一個檢測物件,肯定會放在鏈路中,否則怎麼檢測

2)實現了2個介面,TimeTask,ChannelHandlerHolder

   ①TimeTask,我們就要寫run方法,這應該是一個定時任務,這個定時任務做的事情應該是重連的工作

   ②ChannelHandlerHolder的介面,這個介面我們剛才說過是維護的所有的Handlers,因為在重連的時候需要獲取Handlers

3)bootstrap物件,重連的時候依舊需要這個物件

4)當鏈路斷開的時候會觸發channelInactive這個方法,也就說觸發重連的導火索是從這邊開始的

好了,我們這邊再寫次核心的HeartBeatsClient的程式碼:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

public class HeartBeatsClient {
    
    protected final HashedWheelTimer timer = new HashedWheelTimer();
    
    private Bootstrap boot;
    
    private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();

    public void connect(int port, String host) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();  
        
        boot = new Bootstrap();
        boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
            
        final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {

                public ChannelHandler[] handlers() {
                    return new ChannelHandler[] {
                            this,
                            new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
                            idleStateTrigger,
                            new StringDecoder(),
                            new StringEncoder(),
                            new HeartBeatClientHandler()
                    };
                }
            };
            
            ChannelFuture future;
            //進行連線
            try {
                synchronized (boot) {
                    boot.handler(new ChannelInitializer<Channel>() {

                        //初始化channel
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(watchdog.handlers());
                        }
                    });

                    future = boot.connect(host,port);
                }

                // 以下程式碼在synchronized同步塊外面是安全的
                future.sync();
            } catch (Throwable t) {
                throw new Exception("connects to  fails", t);
            }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用預設值
            }
        }
        new HeartBeatsClient().connect(port, "127.0.0.1");
    }

}
也稍微說明一下:

1)建立了ConnectionWatchdog物件,自然要實現handlers方法

2)初始化好bootstrap物件

3)4秒內沒有寫操作,進行心跳觸發,也就是IdleStateHandler這個方法

最後ConnectorIdleStateTrigger這個類

package com.lyncc.netty.idle;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

@Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    
    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
HeartBeatClientHandler.java(不是重點)
package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

@Sharable
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("啟用時間是:"+new Date());
        System.out.println("HeartBeatClientHandler channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止時間是:"+new Date());
        System.out.println("HeartBeatClientHandler channelInactive");
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }
}

好了,到此為止,所有的程式碼都貼完了,我們做一個簡單的測試,按照常理,如果不出任何狀況的話,客戶端4秒傳送心跳,服務端5秒才驗證是不會斷連的,所以我們在啟動之後,關閉服務端,然後再次重啟服務端

首先啟動服務端,控制檯如下:

啟動客戶端,控制檯如下:

客戶端啟動之後,服務端的控制檯:

關閉服務端後,客戶端控制檯:

重啟啟動服務端:

重連成功~

相關推薦

一起Netty Netty生產心跳機制

sigh,寫這篇部落格的時候老臉還是紅了一下,心裡還是有些唏噓的,應該算是剽竊吧,每個人的程式碼功力的確是有差距的,好在文章的標題是“一起學”,而不是開濤大神的“跟我學”系列的文章,我們還是多花點時間學習吧,感嘆無用~ 最近工作比較忙,但閒暇之餘還是看了阿里的馮家春(fe

一起Netty Netty心跳簡單Demo

前面簡單地瞭解了一下IdleStateHandler,我們現在寫一個簡單的心跳demo: 1)伺服器端每隔5秒檢測伺服器端的讀超時,如果5秒沒有接受到客戶端的寫請求,也就說伺服器端5秒沒有收到讀事件,則視為一次超時 2)如果超時二次則說明連線處於不活躍的狀態,關閉Serve

opencv學習影象顏色通道分離融合

在影象處理時,我們接觸到的彩色以RGB居多,為了分析影象在某一通道上的特性,需要將影象的顏色通道進行分離,或者是在對某一顏色通道處理後重新進行融合。opencv提供了split()函式來進行顏色通道的分離,提供了merge()函式來進行顏色通道的融合。 1.s

MySQL數據備份與還原

pic 還原 mysql .com ace sql .cn uid 數據備份 http://pic.cnhubei.com/space.php?uid=4614&do=album&id=1719909http://pic.cnhubei.com/space.

Git+Jenkins學習自動化指令碼部署實踐

一、環境說明和準備 1、環境說明 主機名 IP地址 角色 系統 deploy-server 192.168.56.12 釋出 Centos 7.4 web 192.1

《大話設計模式》Java程式碼示例備忘錄模式

備忘錄模式(Memonto):在不破壞封裝性的前提下,捕獲一個物件的內部狀態,並在該物件之外儲存這個狀態,這樣以後就可將該物件恢復到原先儲存的狀態。 package memento; /** * 備忘錄模式(Memento) * 遊戲角色 */ public

Java框架springMVC的註解開發

一、註解入門 1.配置springMVC配置檔案 <!-- 添加註解掃描 --> <context:component-scan base-package="com.qf"></context:component-scan>

跟我一起寫 Makefile

使用make更新函式庫檔案——————————— 函式庫檔案也就是對Object檔案(程式編譯的中間檔案)的打包檔案。在Unix下,一般是由命令"ar"來完成打包工作。 一、函式庫檔案的成員 一個函式庫檔案由多個檔案組成。你可以以如下格式指定函式庫檔案及其組成:    

Python爬蟲從入門到放棄 Scrapy框架中選擇器的用法

esp 技術分享 val arr con des image 使用 自己 原文地址https://www.cnblogs.com/zhaof/p/7189860.html Scrapy提取數據有自己的一套機制,被稱作選擇器(selectors),通過特定的Xpath或者CS

Python爬蟲從入門到放棄 Scrapy框架的架構原理

執行 持久 pip 下載 響應 .py example 數據模型 特殊 原文地址https://www.cnblogs.com/zhaof/p/7173397.html 這一篇文章主要是為了對scrapy框架的工作流程以及各個組件功能的介紹 Scrapy目前已經可以很好的在

Java燒腦驢遊--流(Stream)、檔案(File)IO

Java.io包幾乎包含了所有操作輸入、輸出需要的類。所有這些流類代表了輸入源和輸出目標。 Java.io包中的流支援很多種格式,比如:基本型別、物件、本地化字符集等等。 一個流可以理解為一個數據的序列。輸入流表示從一個源讀取資料,輸出流表示向一個目標寫資料

Netty Netty生產心跳機制

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

一起Nettynetty原始碼學習netty server端原始碼初讀

server端是使用了Reactor模式對nio進行了一些封裝,Reactor模式網上有很多資料,不贅述,瞭解了這個模式開始看原始碼 netty的版本是4.0.21.Final <dependency> <groupId>io.netty<

一起Nettynetty原始碼學習大話java NIO

沉澱了一個月安安心心地學習了家純大神的Jupiter(https://github.com/fengjiachun/Jupiter),感覺受益良多,感覺自己學習了這裡面的精華的50%,不是謙虛,而是無知,因為我不知道著裡面還有多少是我沒有理解的,也許我看懂了他的程式碼,但我

一起python-opencv影象閾值化,影象縮放

影象閾值化也可以叫做二值化,其實我們前面已經用過了很多次的cv2.threshold,另外就是cv2.inRange,這個主要用HSV顏色空間來分離出某一種顏色的區域。前面我們只用了幾種閾值化的型別,那麼這篇文章的開頭,就讓我們來認識一下其它的閾值化型別。 我

全棧JavaScriptHTML5 中與class屬性相關的擴充

mov html5 表示 方法 popu dom add data- token 1. getElementByClassName() :支持getElementsByClassName()方法的瀏覽器有IE 9+、Firefox 3+、Safari

C邏輯運算符

C語言 && || ! 我們在 C 語言中經常會遇到邏輯運算符。|| 是從左向右開始計算的,當遇到為真的條件時停止計算,整個表達式為真;所有條件為假時表達式才為假。 && 是從左向右開始計算,當遇到為假的條件時停止計算,整個表達式為假;所有條件為真時表達式才為真

Spark學習SparkCore的調優資源調優JVM的GC垃圾收集器

當前 復制 event 只需要 引用 應用 之前 相互 分享 一、概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的 Lisp 語言,經過半個多世紀,目前已經十分成熟了。 jvm 中,程序計數

機器學習numpymatplotlib學習

今天繼續來講numpy中的一些基本函式使用。 #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : SundayCoder-俊勇 # @File : numpy6.py import numpy as np # n

Python小白學習—【作用域】【匿名函式】【程式設計方法論】【高階函式】

    吧啦吧啦內心戲 在沒有具體學作用域之前,我在之前的學習筆記中就有提到 我開始以為是自己自創的詞兒 沒想到這個詞早已經存在(手動捂臉) 真是個無知的小火鍋(不知者無罪) 我發現自己最擅長做的事情,就是給自己找個臺階,然後很快順勢滑下來 一、作用域 先來一段程式碼分析一波吧