1. 程式人生 > >《netty權威指南》之模擬伺服器之間的心跳檢測

《netty權威指南》之模擬伺服器之間的心跳檢測

在叢集環境下伺服器之間是要定時進行心跳檢測的,那麼netty可以用來做這件事,
在叢集環境中,選定一臺服務區做master,其餘的做salve
即master <==>  server端
   salve   <==>  客戶端
客戶端定時像服務端傳送請求,當然在請求之間先進行認證

服務端程式碼如下

package com.lyzx.netty.netty06;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

/**
 * @author hero.li
 * 
 */
public class Server{

    public static void main(String[] args) throws InterruptedException {
        //開啟兩個執行緒組,一個用於接受客戶端的請求   另一個用於非同步的網路IO的讀寫
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        //Netty啟動的輔助類 裡面封裝了客戶端和服務端的連結以及如何處理選擇器 selector等邏輯
        ServerBootstrap b = new ServerBootstrap();

        //傳入兩個執行緒組,設定傳輸塊大小為1k,
//新增ServerHandler型別的過濾器(表示如何處理這些訊息,過濾器當然要整合netty的一個介面)
        b.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception{
                        ChannelHandler[] arr = {MarshallingCodeCFactory.marshallingDecoder(),
                                                MarshallingCodeCFactory.marshallingEncoder(),
                                                new ReadTimeoutHandler(30),
                                                new ServerHandler()};
                        ch.pipeline().addLast(arr);
                    }
                });

        //同步等待繫結埠結束
        ChannelFuture f = b.bind(9988).sync();
        //等待服務端監聽埠關閉
        f.channel().closeFuture().sync();
        //優雅的關閉執行緒組
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

服務端Handler

package com.lyzx.netty.netty06;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * 對於網事件做讀寫,通常只要關注channelRead()和exceptionCaught()即可
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static final String AUTH_SUCCESS_FLAG = "AUTH_SUCCESS";
    private static final String AUTH_FAIL_FLAG = "AUTH_FAIL";

    private static final Map<String,String> KEYS = new ConcurrentHashMap<>();
    static{
        //這兒本應該讀取資料庫以初始化可以訪問該伺服器的客戶端
        KEYS.put("192.168.22.170","abcd007");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg.getClass() == String.class){
            //剛建立連線時的握手資訊
            String ipAndSuffix = String.valueOf(msg);
            String[] ipAndSuffiArr = ipAndSuffix.split("_");
            String ip = ipAndSuffiArr[0];
            String suffix  = ipAndSuffiArr[1];
            System.out.println("ip:"+ip+"   ,  "+suffix);
            if(KEYS.containsKey(ip)){
                if(suffix.equals(KEYS.get(ip))){
                    ctx.channel().writeAndFlush(AUTH_SUCCESS_FLAG);
                    return;
                }
            }
            ctx.channel()
               .writeAndFlush(AUTH_FAIL_FLAG)
               .addListener(ChannelFutureListener.CLOSE);
        }else{
            System.out.println("server:channelRead____通道可讀開始");
            NettyRequest nr = (NettyRequest)msg;
            System.out.println("server:收到的訊息____:"+nr);

            String datetime = LocalDateTime.now()
                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS"));
            nr.setMsg(datetime);
            ctx.channel().writeAndFlush(nr);
            System.out.println("server:channelRead____通道可讀結束");
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server:channelReadComplete____通道可讀完成 ");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server:exceptionCaught____發生異常");
        ctx.close();
    }

}

客戶端程式碼

package com.lyzx.netty.netty06;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;


public class Client {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception{
                            ChannelHandler[] arr = 
            
                {MarshallingCodeCFactory.marshallingDecoder(),                             
                 MarshallingCodeCFactory.marshallingEncoder(),
                 new ReadTimeoutHandler(30),
                 new ClientHandler()};
                            ch.pipeline().addLast(arr);
                    }
                });
        ChannelFuture f = b.connect("127.0.0.1", 9988).sync();

        f.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

客戶端Handler

package com.lyzx.netty.netty06;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.InetAddress;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    private static final String AUTH_SUCCESS_FLAG = "AUTH_SUCCESS";
    private static final String AUTH_FAIL_FLAG = "AUTH_FAIL";
    private String auth_suffix = "abcd007";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:channelActive____通道啟用開始");
        String ip = InetAddress.getLocalHost().getHostAddress();
        String auth_key = ip+"_"+auth_suffix;
        ctx.channel().writeAndFlush(auth_key);
        System.out.println("client:channelActive____通道啟用結束");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        if(msg.getClass() == String.class){
            if(AUTH_SUCCESS_FLAG.equals(msg)){
                new Thread(new Scheduler(ctx)).start();
            }else{
                System.out.println("========認證失敗:"+AUTH_FAIL_FLAG);
            }
        }else{
            NettyRequest nr = (NettyRequest)msg;
            System.out.println("client____response time:"+nr);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:通道可讀完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client:發生異常");

    }
}

class Scheduler implements Runnable{
    private  ChannelHandlerContext ctx;

    public Scheduler(ChannelHandlerContext ctx){
        this.ctx = ctx;
    }

    @Override
    public void run() {
//模擬定時傳送心跳訊息
        for(int i=0;i<20;i++){
            NettyRequest nr = new NettyRequest();
            nr.setId((long)i);
            nr.setCode(i);
            nr.setMsg("data_data:"+i);
            ctx.channel().writeAndFlush(nr);
            try {Thread.sleep(2000);
            }catch(InterruptedException e){e.printStackTrace();}
        }
    }
}

其他工具類

package com.lyzx.netty.netty06;

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工廠
 * @author(alienware)
 * @since 2014-12-16
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder marshallingDecoder() {
    	//首先通過Marshalling工具類的方法獲取Marshalling例項物件 
        //引數serial標識建立的是java序列化工廠物件。
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		//建立了MarshallingConfiguration物件,配置了版本號為5 
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		//根據marshallerFactory和configuration建立provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingDecoder物件,
        //倆個引數分別為provider和單個訊息序列化後的最大長度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
		return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder marshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//構建Netty的MarshallingEncoder物件,
        //MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
    }
}

通訊實體類

package com.lyzx.netty.netty06;

import java.io.Serializable;

public class NettyRequest implements Serializable {
    private Long id;

    private int code;
    private String msg;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "NettyRequest{" +
                "id=" + id +
                ", code=" + code +
                ", msg='" + msg + '\'' +
                '}';
    }

}