1. 程式人生 > >Netty 編解碼技術 數據通信和心跳監控案例

Netty 編解碼技術 數據通信和心跳監控案例

base encode lean java語言 附件 rep baidu 類比 syn

Netty 編解碼技術 數據通信和心跳監控案例

多臺服務器之間在進行跨進程服務調用時,需要使用特定的編解碼技術,對需要進行網絡傳輸的對象做編碼和解碼操作,以便完成遠程調用。Netty提供了完善,易擴展,易使用的編解碼技術。本章除了介紹Marshalling的使用,還會基於編解碼技術實現數據通信和心跳檢測案例。通過本章,你將學到Java序列化的優缺點,主流編解碼框架的特點,模擬特殊長連接通信,心跳監控案例。還在等什麽,豐滿的知識等你來拿!

技術:編解碼,數據通信,心跳監控
說明:github上有完整代碼,部分文字描述摘錄《Netty權威指南》
源碼:https://github.com/ITDragonBlog/daydayup/tree/master/Netty/netty-stu

編解碼

Netty 的一大亮點就是使用簡單,將常用的功能和API進行了很好的封裝,編解碼也不例外。針對編解碼功能,Netty提供了通用的編解碼框架常用的編解碼類庫,方便用戶擴展和使用。從而降低用戶的工作量和開發門檻。在io.netty.handler.codec目錄下找到很多預置的編解碼功能.
其實在上一章的知識點中,就已經使用了Netty的編解碼技術,如:DelimiterBasedFrameDecoder,FixedLengthFrameDecoder,StringDecoder

什麽是編解碼技術

編碼(Encode)也稱序列化(serialization),將對象序列化為字節數組,用於網絡傳輸、數據持久化等用途。

解碼(Decode)也稱反序列化(deserialization),把從網絡、磁盤等讀取的字節數組還原成原始對象,以方便後續的業務邏輯操作。

主流編解碼框架

Java序列化

Java序列化使用簡單開發難度低。只需要實現java.io.Serializable接口並生成序列化ID,這個類就能夠通過java.io.ObjectInput序列化和java.io.ObjectOutput反序列化。
但它也有存在很多缺點 :
1 無法跨語言(java的序列化是java語言內部的私有協議,其他語言並不支持),
2 序列化後碼流太大(采用二進制編解碼技術要比java原生的序列化技術強),
3 序列化性能太低

JBoss的Marshalling

JBoss的Marshalling是一個Java對象的序列化API包,修正了JDK自帶序列化包的很多問題,又兼容java.io.Serializable接口;同時可通過工廠類進行參數和特性地配置。
1) 可插拔的類解析器,提供更加便捷的類加載定制策略,通過一個接口即可實現定制;
2) 可插拔的對象替換技術,不需要通過繼承的方式;
3) 可插拔的預定義類緩存表,可以減小序列化的字節數組長度,提升常用類型的對象序列化性能;
4) 無須實現java.io.Serializable接口,即可實現Java序列化;
5) 通過緩存技術提升對象的序列化性能。
6) 使用範圍小,通用性較差。

Google的Protocol Buffers

Protocol Buffers由谷歌開源而來。將數據結構以 .proto 文件進行描述,通過代碼生成工具可以生成對應數據結構的POJO對象和Protobuf相關的方法和屬性。
1) 結構化數據存儲格式(XML,JSON等);
2) 高效的編解碼性能;
3) 平臺無關、擴展性好;
4) 官方支持Java、C++和Python三種語言。

MessagePack框架

MessagePack是一個高效的二進制序列化格式。和JSON一樣跨語言交換數據。但是它比JSON更快、更小(It‘s like JSON.but fast and small)。
1) 高效的編解碼性能;
2) 跨語言;
3) 序列化後碼流小;

Marshalling 配置工廠

package com.itdragon.marshalling;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;  
import org.jboss.marshalling.Marshalling;  
import org.jboss.marshalling.MarshallingConfiguration; 

public final class ITDragonMarshallerFactory {
    private static final String NAME = "serial"; // serial表示創建的是 Java序列化工廠對象.由jboss-marshalling-serial提供 
    private static final Integer VERSION = 5;  
    private static final Integer MAX_OBJECT_SIZE = 1024 * 1024 * 1; // 單個對象最大長度 
    /** 
     * 創建Jboss Marshalling 解碼器MarshallingDecoder 
     */  
    public static MarshallingDecoder buildMarshallingDecoder() {  
        // step1 通過工具類 Marshalling,獲取Marshalling實例對象,參數serial 標識創建的是java序列化工廠對象  
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(NAME);  
        // step2 初始化Marshalling配置  
        final MarshallingConfiguration configuration = new MarshallingConfiguration();  
        // step3 設置Marshalling版本號  
        configuration.setVersion(VERSION);  
        // step4 初始化生產者  
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);  
        // step5 通過生產者和單個消息序列化後最大長度構建 Netty的MarshallingDecoder  
        MarshallingDecoder decoder = new MarshallingDecoder(provider, MAX_OBJECT_SIZE);  
        return decoder;  
    }  
    /** 
     * 創建Jboss Marshalling 編碼器MarshallingEncoder 
     */  
    public static MarshallingEncoder builMarshallingEncoder() {  
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(NAME);  
        final MarshallingConfiguration configuration = new MarshallingConfiguration();  
        configuration.setVersion(VERSION);  
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);  
        MarshallingEncoder encoder = new MarshallingEncoder(provider);  
        return encoder;  
    }  
}

數據通信

一個網絡應用最重要的工作莫過於數據的傳輸。兩臺機器之間如何建立連接才能提高服務器利用率,減輕服務器的壓力。這都是我們值得去考慮的問題。

常見的三種通信模式

1) 長連接:服務器和客戶端的通道一直處於開啟狀態。合適服務器性能好,客戶端數量少的場景。
2) 短連接:只有在發送數據時建立連接,數據發送完後斷開連接。一般將數據保存在本地,根據某種邏輯一次性批量提交。適合對實時性不高的應用場景。
3) 特殊長連接:它擁有長連接的特性。當在服務器指定時間內,若沒有任何通信,連接就會斷開。若客戶端再次向服務端發送請求,則需重新建立連接。主要為減小服務端資源占用。
本章重點介特殊長連接。它的設計思想在很多場景中都有,比如QQ的離開狀態,電腦的休眠狀態。既保證了用戶的正常使用,又減輕了服務器的壓力。是實際開發中比較常用的通信模式。
它有三個情況:
一、服務器和客戶端的通道一直處於開啟狀態。
二、指定時間內沒有通信則斷開連接。
三、客戶端重新發起請求,則重新建立連接。

結合上面的Marshalling 配置工廠,模擬特殊長連接的通信場景。
客戶端代碼,輔助啟動類Bootstrap,配置編解碼事件,超時事件,自定義事件。客戶端發送請求分兩中情況,通信連接時請求和連接斷開後請求。上一章有詳細的配置說明

package com.itdragon.marshalling;
import java.io.File;
import java.io.FileInputStream;
import java.util.concurrent.TimeUnit;
import com.itdragon.utils.ITDragonUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class ITDragonClient {
    
    private static final Integer PORT = 8888;
    private static final String HOST = "127.0.0.1";
    private EventLoopGroup group = null;
    private Bootstrap bootstrap = null;
    private ChannelFuture future = null;
    
    private static class SingletonHolder {
        static final ITDragonClient instance = new ITDragonClient();
    }
    public static ITDragonClient getInstance(){
        return SingletonHolder.instance;
    }
    public ITDragonClient() {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(ITDragonMarshallerFactory.buildMarshallingDecoder());  // 配置編碼器
                    socketChannel.pipeline().addLast(ITDragonMarshallerFactory.builMarshallingEncoder());   // 配置解碼器
                    socketChannel.pipeline().addLast(new ReadTimeoutHandler(5));    // 表示5秒內沒有連接後斷開
                    socketChannel.pipeline().addLast(new ITDragonClientHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, 1024);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
    public void connect(){
        try {
            future = bootstrap.connect(HOST, PORT).sync();
            System.out.println("連接遠程服務器......");                
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ChannelFuture getChannelFuture(){
        if(this.future == null || !this.future.channel().isActive()){
            this.connect();
        }
        return this.future;
    }
    /**
     * 特殊長連接:
     * 1. 服務器和客戶端的通道一直處於開啟狀態,
     * 2. 在服務器指定時間內,沒有任何通信,則斷開,
     * 3. 客戶端再次向服務端發送請求則重新建立連接,
     * 4. 從而減小服務端資源占用壓力。
     */
    public static void main(String[] args) {
        final ITDragonClient client = ITDragonClient.getInstance();
        try {
            ChannelFuture future = client.getChannelFuture();
            // 1. 服務器和客戶端的通道一直處於開啟狀態,
            for(Long i = 1L; i <= 3L; i++ ){
                ITDragonReqData reqData = new ITDragonReqData();
                reqData.setId(i);
                reqData.setName("ITDragon-" + i);
                reqData.setRequestMsg("NO." + i + " Request");
                future.channel().writeAndFlush(reqData);
                TimeUnit.SECONDS.sleep(2); // 2秒請求一次,服務器是5秒內沒有請求則會斷開連接
            }
            // 2. 在服務器指定時間內,沒有任何通信,則斷開,
            Thread.sleep(6000);
            // 3. 客戶端再次向服務端發送請求則重新建立連接,
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println("喚醒......");
                        ChannelFuture cf = client.getChannelFuture();
                        System.out.println("連接是否活躍  : " + cf.channel().isActive());
                        System.out.println("連接是否打開  : " + cf.channel().isOpen());
                        ITDragonReqData reqData = new ITDragonReqData();
                        reqData.setId(4L);
                        reqData.setName("ITDragon-picture");
                        reqData.setRequestMsg("斷開的通道被喚醒了!!!!");
                        // 路徑path自定義
                        String path = System.getProperty("user.dir") + File.separatorChar + 
                            "sources" + File.separatorChar + "itdragon.jpg";  
                        File file = new File(path);  
                        FileInputStream inputStream = new FileInputStream(file);  
                        byte[] data = new byte[inputStream.available()];  
                        inputStream.read(data);  
                        inputStream.close();  
                        reqData.setAttachment(ITDragonUtil.gzip(data));
                        cf.channel().writeAndFlush(reqData);                    
                        cf.channel().closeFuture().sync();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            future.channel().closeFuture().sync();
            System.out.println("斷開連接,主線程結束.....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客戶端自定義事務代碼,負責將服務器返回的數據打印出來

package com.itdragon.marshalling;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class ITDragonClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty Client active ^^^^^^");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ITDragonRespData responseData = (ITDragonRespData) msg;
            System.out.println("Netty Client : " + responseData.toString());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服務器代碼,輔助啟動類ServerBootstrap,配置日誌打印事件,編解碼事件,超時控制事件,自定義事件。

package com.itdragon.marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler; 

public class ITDragonServer {

    private static final Integer PORT = 8888;
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(ITDragonMarshallerFactory.buildMarshallingDecoder());  // 配置解碼器
                        socketChannel.pipeline().addLast(ITDragonMarshallerFactory.builMarshallingEncoder());   // 配置編碼器
                        socketChannel.pipeline().addLast(new ReadTimeoutHandler(5)); // 傳入的參數單位是秒,表示5秒內沒有連接後斷開
                        socketChannel.pipeline().addLast(new ITDragonServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.bind(PORT).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

服務器自定義事件代碼,負責接收客戶端傳輸的數據,若有附件則下載到receive目錄下(這裏只是簡單的下載邏輯)。

package com.itdragon.marshalling;
import java.io.File;
import java.io.FileOutputStream;
import com.itdragon.utils.ITDragonUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  

public class ITDragonServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty Server active ......");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            // 獲取客戶端傳來的數據
            ITDragonReqData requestData = (ITDragonReqData) msg;
            System.out.println("Netty Server : " + requestData.toString());
            // 處理數據並返回給客戶端
            ITDragonRespData responseData = new ITDragonRespData();
            responseData.setId(requestData.getId());
            responseData.setName(requestData.getName() + "-SUCCESS!");
            responseData.setResponseMsg(requestData.getRequestMsg() + "-SUCCESS!");
            // 如果有附件則保存附件
            if (null != requestData.getAttachment()) {
                byte[] attachment = ITDragonUtil.ungzip(requestData.getAttachment());
                String path = System.getProperty("user.dir") + File.separatorChar + "receive" + 
                        File.separatorChar + System.currentTimeMillis() + ".jpg";
                FileOutputStream outputStream = new FileOutputStream(path);
                outputStream.write(attachment);
                outputStream.close();
                responseData.setResponseMsg("file upload success , file path is : " + path);
            }
            ctx.writeAndFlush(responseData);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ITDragonReqData 和 ITDragonRespData 實體類的代碼就不貼出來了,github上有源碼。

心跳監控案例

在分布式,集群系統架構中,我們需要定時獲取各機器的資源使用情況和服務器之間是否保持正常連接狀態。以便能在最短的時間內避免和處理問題。類比集群中的哨兵模式。

獲取本機數據

可以通過第三方sigar.jar的幫助,獲取主機的運行時信息,包括操作系統、CPU使用情況、內存使用情況、硬盤使用情況以及網卡、網絡信息。使用很簡單,根據自己電腦的系統選擇對應的dll文件,然後拷貝到C:\Windows\System32 目錄下即可。比如windows7 64位操作系統,則需要sigar-amd64-winnt.dll文件。
下載路徑:https://pan.baidu.com/s/1jJSaucI 密碼: 48d2

ITDragonClient.java,ITDragonCoreParam.java,ITDragonRequestInfo.java,ITDragonServer.java,ITDragonSigarUtil.java,pom.xml 的代碼這裏就不貼出來了,github上面有完整的源碼。

客戶端自定義事件代碼,負責發送認證信息,定時向服務器發送cpu信息和內存信息。

package com.itdragon.monitoring;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  
  
public class ITDragonClientHandler extends ChannelInboundHandlerAdapter{  
      
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);  
    private ScheduledFuture<?> heartBeat;  
    private InetAddress addr ;  //主動向服務器發送認證信息  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {  
        System.out.println("Client 連接一開通就開始驗證.....");  
        addr = InetAddress.getLocalHost();  
        String ip = addr.getHostAddress();  
        System.out.println("ip : " + ip);
        String key = ITDragonCoreParam.SALT_KEY.getValue(); // 假裝進行了很復雜的加鹽加密  
        // 按照Server端的格式,傳遞令牌   
        String auth = ip + "," + key;   
        ctx.writeAndFlush(auth);  
    }  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        try {  
            if(msg instanceof String){  
                String result = (String) msg;  
                if(ITDragonCoreParam.AUTH_SUCCESS.getValue().equals(result)){  
                    // 驗證成功,每隔10秒,主動發送心跳消息  
                    this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 10, TimeUnit.SECONDS);  
                    System.out.println(msg);                  
                }  
                else {  
                    System.out.println(msg);  
                }  
            }  
        } finally {  
            ReferenceCountUtil.release(msg);  
        }  
    }  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
        cause.printStackTrace();  
        if (heartBeat != null) {  
            heartBeat.cancel(true);  
            heartBeat = null;  
        }  
        ctx.fireExceptionCaught(cause);  
    }  
}  
class HeartBeatTask implements Runnable {  
    private final ChannelHandlerContext ctx;  
    public HeartBeatTask(final ChannelHandlerContext ctx) {  
        this.ctx = ctx;  
    }  
    public void run() {  
        try {  
            // 采用sigar 獲取本機數據,放入實體類中  
            ITDragonRequestInfo info = new ITDragonRequestInfo();  
            info.setIp(InetAddress.getLocalHost().getHostAddress()); // ip  
            Sigar sigar = new Sigar();  
            
            CpuPerc cpuPerc = sigar.getCpuPerc();  
            HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();  
            cpuPercMap.put(ITDragonCoreParam.COMBINED.getValue(), cpuPerc.getCombined());  
            cpuPercMap.put(ITDragonCoreParam.USER.getValue(), cpuPerc.getUser());  
            cpuPercMap.put(ITDragonCoreParam.SYS.getValue(), cpuPerc.getSys());  
            cpuPercMap.put(ITDragonCoreParam.WAIT.getValue(), cpuPerc.getWait());  
            cpuPercMap.put(ITDragonCoreParam.IDLE.getValue(), cpuPerc.getIdle());  
            
            Mem mem = sigar.getMem();  
            HashMap<String, Object> memoryMap = new HashMap<String, Object>();  
            memoryMap.put(ITDragonCoreParam.TOTAL.getValue(), mem.getTotal() / 1024L);  
            memoryMap.put(ITDragonCoreParam.USED.getValue(), mem.getUsed() / 1024L);  
            memoryMap.put(ITDragonCoreParam.FREE.getValue(), mem.getFree() / 1024L);  
            info.setCpuPercMap(cpuPercMap);  
            info.setMemoryMap(memoryMap);  
            ctx.writeAndFlush(info);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  

服務器自定義事件代碼,負責接收客戶端傳輸的數據,驗證令牌是否失效,打印客戶端傳來的數據。

package com.itdragon.monitoring;
import java.util.HashMap;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ITDragonServerHandler extends ChannelInboundHandlerAdapter {

    // 令牌驗證的map,key為ip地址,value為密鑰
    private static HashMap<String, String> authMap = new HashMap<String, String>();
    // 模擬數據庫查詢
    static {
        authMap.put("xxx.xxx.x.x", "xxx");
        authMap.put(ITDragonCoreParam.CLIENT_HOST.getValue(), ITDragonCoreParam.SALT_KEY.getValue());
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty Server Monitoring.......");
    }
    // 模擬api請求前的驗證
    private boolean auth(ChannelHandlerContext ctx, Object msg) {
        System.out.println("令牌驗證...............");
        String[] ret = ((String) msg).split(",");
        String clientIp = ret[0];   // 客戶端ip地址
        String saltKey = ret[1];    // 數據庫保存的客戶端密鑰
        String auth = authMap.get(clientIp); // 客戶端傳來的密鑰
        if (null != auth && auth.equals(saltKey)) {
            ctx.writeAndFlush(ITDragonCoreParam.AUTH_SUCCESS.getValue());
            return true;
        } else {
            ctx.writeAndFlush(ITDragonCoreParam.AUTH_ERROR.getValue()).addListener(ChannelFutureListener.CLOSE);
            return false;
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果傳來的消息是字符串,則先驗證
        if (msg instanceof String) {
            auth(ctx, msg);
        } else if (msg instanceof ITDragonRequestInfo) {
            ITDragonRequestInfo info = (ITDragonRequestInfo) msg;
            System.out.println("--------------------------------------------");
            System.out.println("當前主機ip為: " + info.getIp());
            HashMap<String, Object> cpu = info.getCpuPercMap();
            System.out.println("cpu 總使用率: " + cpu.get(ITDragonCoreParam.COMBINED.getValue()));
            System.out.println("cpu 用戶使用率: " + cpu.get(ITDragonCoreParam.USER.getValue()));
            System.out.println("cpu 系統使用率: " + cpu.get(ITDragonCoreParam.SYS.getValue()));
            System.out.println("cpu 等待率: " + cpu.get(ITDragonCoreParam.WAIT.getValue()));
            System.out.println("cpu 空閑率: " + cpu.get(ITDragonCoreParam.IDLE.getValue()));

            HashMap<String, Object> memory = info.getMemoryMap();
            System.out.println("內存總量: " + memory.get(ITDragonCoreParam.TOTAL.getValue()));
            System.out.println("當前內存使用量: " + memory.get(ITDragonCoreParam.USED.getValue()));
            System.out.println("當前內存剩余量: " + memory.get(ITDragonCoreParam.FREE.getValue()));
            System.out.println("--------------------------------------------");

            ctx.writeAndFlush("info received!");
        } else {
            ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

總結

1 Netty的編解碼功能很好的解決了Java序列化 無法跨語言,序列化後碼流太大,序列化性能太低等問題
2 JBoss的Marshalling是一個Java對象的序列化API包,修正了JDK自帶的序列化包的很多問題,又兼容java.io.Serializable接口,缺點使用範圍小。
3 特殊長連接可以減小服務端資源占用壓力,是一種比較常用的數據通信方式。
4 Netty可以用做心跳監測,定時獲取被監聽機器的數據信息。

推薦文檔

Netty 能做什麽?學Netty有什麽用?
https://www.zhihu.com/question/24322387
http://blog.csdn.net/broadview2006/article/details/46041995
Marshalling :
http://jbossmarshalling.jboss.org/

Netty 編解碼數據通信和心跳監控案例到這裏就結束了,感謝大家的閱讀,歡迎點評。如果你覺得不錯,可以"推薦"一下。也可以"關註"我,獲得更多豐富的知識。

Netty 編解碼技術 數據通信和心跳監控案例