[netty]-消息編解碼之Java原生序列化

分類:編程 時間:2017-02-03

消息對象在網絡上傳輸時,我們往往要對消息進行編解碼。現在編解碼技術非常多,包括Java原生的序列化、Google的protoBuf、hessian等等。這一篇博客我們主要介紹Java的原生序列化編解碼以及其優缺點。

基於Java提供的對象輸入/輸出流 ObjectInputStream和ObjectOutputStream可以直接將對象序列化為可存儲的字節數組寫入文件或則是在網絡上傳輸。Java對象的序列化目的一般只有兩個:

  1. 網絡傳輸;
  2. 對象持久化。

1. Java序列化的缺點

Java序列化從JDK1.1 就已經提供,不需要添加其他的任何依賴庫就可以實現,只需要實現Serializable接口,並生成唯一序列化ID就可以序列化。但是Java原生序列化有諸多的缺點,導致在實際的生產環境中基本都是用其余開源的編解碼框架。下面我們來看看Java原生的序列化有什麽缺點:

(1) 無法跨語言:這也是最致命的缺陷,導致無法再異構系統中使用。

(2)序列化之後碼流太大:序列化之後占用太大的字節。

(3)序列化性能太低:序列化比較占用CPU的時間。

2.業界主流的編解碼框架:

由於Java序列化的表現差強人意,所以業界推出了很多高性能的編解碼框架,比如Google的Protobuf, hessian等等。這篇博文主要說明Java原生序列化的應用。

3. netty中使用Java序列化編解碼

在不考慮跨語言調用和性能時,Java序列化任然是首選的機制,因為原生的API使用起來十分方便。這一節主要包括:

  1. netty序列化服務端編程
  2. netty序列化客戶端編程
  3. 應用實例

1. 服務端編程

應用場景:客戶端發送用戶訂購請求消息,然後服務端接收到請求消息對象後處理,然後發送響應對象給客戶端。請求消息和效應消息的對象定義如下:
request:

package netty.quanwei.p7;

import java.io.Serializable;

/**
 * Created by louyuting on 17/2/1.
 *
 */
public class SubscribeReq implements Serializable{
    private String messageID;

    private String userName;

    private String productName;

    private String phone;

    private String address;

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getMessageID() {
        return messageID;
    }

    public void setMessageID(String messageID) {
        this.messageID = messageID;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}
@Override
public String toString() {
    return "SubscribeReq: [messageID]:"+ messageID + " [userName]:" +userName
            + " [productName]:" +productName+ " [phone]:" +phone+ " [address]:" +address;
}

response:

package netty.quanwei.p7;

import java.io.Serializable;

/**
 * Created by louyuting on 17/2/1.
 */
public class SubscribeResp implements Serializable {
    private String messageID;

    private String respCode;

    private String description;

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getMessageID() {
        return messageID;
    }

    public void setMessageID(String messageID) {
        this.messageID = messageID;
    }

    public String getRespCode() {
        return respCode;
    }

    public void setRespCode(String respCode) {
        this.respCode = respCode;
    }
}
@Override
public String toString() {
    return "SubscribeReq: [messageID]:"+ messageID + " [respCode]:" +respCode
            + " [description]:" +description;
}

這裏我們使用netty提供的ObjectDecoder和ObjectEncoder來進行編解碼。

服務端的主函數編碼如下:

package netty.quanwei.p7;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created by louyuting on 17/1/31.
 */
public class SubreqServer {

    public void bind(int port) throws Exception{
        //配置服務端NIO 線程組
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap();

        try {
            server.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            /**
                             * 解碼器: 構造器傳入了兩個參數: #1 單個對象序列化後最大字節長度,這是設置是1M;
                             *                           #2 類解析器: weakCachingConcurrentResolver創建線程安全的WeakReferenceMa對類加載器進行緩存,
                             *                                      支持多線程並發訪問,當虛擬機內存不足時,會釋放緩存中的內存,防止內存泄漏.
                             */
                            ch.pipeline().addLast(new ObjectDecoder(1024*1024,
                                    ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );

                            ch.pipeline().addLast(new ObjectEncoder());

                            ch.pipeline().addLast(new SubreqServerHandler());
                        }
                    });

            //綁定端口, 同步等待成功
            ChannelFuture future = server.bind(port).sync();

            //等待服務端監聽端口關閉
            future.channel().closeFuture().sync();
        } finally {
            //優雅關閉 線程組
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    /**
     * main 函數
     * @param args
     */
    public static void main(String[] args) {
        SubreqServer server = new SubreqServer();
        try {
            server.bind(18888);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在handler容器中增加了一個ObjectDecoder,負責對實現了序列化接口的POJO進行解碼,它有多個構造函數,支持不同的類解析器。這裏我們使用weakCachingConcurrentResolver創建線程安全的WeakReferenceMa對類加載器進行緩存,支持多線程並發訪問,當虛擬機內存不足時,會釋放緩存中的內存,防止內存泄漏。 此外還設置了單個對象序列化後最大字節長度,這是設置是1M。

最後訂購消息在SubreqServerHandler中處理:

package netty.quanwei.p7;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
 * Created by louyuting on 17/1/31.
 */
public class SubreqServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log_debug("Server -> read");

        SubscribeReq req = (SubscribeReq)msg;

        if( "louyuting".equalsIgnoreCase(req.getUserName()) ){
            system.out.println("service accept client subscript req :[\n"+ req.toString() +"]");

            ctx.writeAndFlush( resp(req.getMessageID()) );
        }
    }

    private SubscribeResp resp(String reqID){
        SubscribeResp response = new SubscribeResp();
        response.setMessageID(reqID);
        response.setRespCode("0");
        response.setDescription("subscribe is success book will arrive after 3 days");
        return response;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log_debug("Server -> read complete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //釋放資源
        ctx.close();
    }
}

客戶端程序開發

客戶端核心代碼思路是:
1)在客戶端將netty提供的編解碼器添加到ChannelPipeline

2)在客戶端鏈路被激活的時候發送10條訂購消息,為了檢驗netty提供的Java序列化功能是否支持TCP的黏包/拆包功能,客戶端一次性構造10條訂購消息並一次性發送給服務器,看服務器能夠成功反序列化。

3) 客戶端接收到服務端的反饋消息,打印。

客戶端核心代碼:

package netty.quanwei.p7;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created by louyuting on 17/1/31.
 * netty 時間服務器 客戶端
 */
public class SubreqClient {

    public void connect(int port, String host) throws Exception{
        //配置客戶端NIO 線程組
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap client = new Bootstrap();

        try {
            client.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ObjectDecoder(1024,
                                    ClassResolvers.cacheDisabled(this.getClass().getClassLoader())) );

                            ch.pipeline().addLast(new ObjectEncoder());
                            ch.pipeline().addLast(new SubreqClientHandler());
                        }
                    });

            //綁定端口, 異步連接操作
            ChannelFuture future = client.connect(host, port).sync();

            //等待客戶端連接端口關閉
            future.channel().closeFuture().sync();
        } finally {
            //優雅關閉 線程組
            group.shutdownGracefully();
        }
    }

    /**
     * main 函數
     * @param args
     */
    public static void main(String[] args) {
        SubreqClient client = new SubreqClient();
        try {
            client.connect(18888, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在ObjectDecoder構造函數中配置不允許對類加載器緩存。下面再看SubreqClientHandler的實現:

package netty.quanwei.p7;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
 * Created by louyuting on 17/1/31.
 */
public class SubreqClientHandler extends ChannelInboundHandlerAdapter{

    public SubreqClientHandler() {
    }

    /**
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LogUtil.log_debug("client -> active");

        for(int i=0; i<10; i++){
            ctx.write(subReq(String.valueOf(i)));
        }

        // 寫入10個對象到發送緩沖之後再一次性 flush寫入通道
        ctx.flush();
    }


    private SubscribeReq subReq(String id){
        SubscribeReq req = new SubscribeReq();
        req.setMessageID(id);
        req.setUserName("louyuting");
        req.setProductName("iPhone 7");
        req.setPhone("13026317652");
        req.setAddress("HUST");

        return req;
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LogUtil.log_debug("client -> read");


        LogUtil.log_debug("receive server response: { " + msg.toString() +"]");

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

在鏈路激活時,即channelActive()函數中一次性構建10條消息,然後一次發送給服務器。

運行結果

整個運行流程就是:
客戶端先構建消息對象–》

在客戶端與服務端連接上時,一次性發送10個對象給服務端,這裏發送時在客戶端會經過ObjectEncoder編碼為字節數據–》

服務端接收到字節數組,先經過ObjectDecoder解碼成為實際的Java對象(這裏的ObjectDecoder需要傳入類解析器參數,類解析器參數也要傳入加載器參數),然後服務端的handler處理後染回response對象給客戶端,返回response對象在服務端也會被ObjectEncoder編碼—》

客戶端收到響應,也是先解碼,然後回去響應消息,並處理。

以上基本上就是完整的流程,我們看看運行結果:
服務端:

2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:0 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:1 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:2 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:3 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:4 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:5 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:6 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:7 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:8 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read
service accept client subscript req :[
SubscribeReq: [messageID]:9 [userName]:louyuting [productName]:iphone 7 [phone]:13026317652 [address]:HUST]
2017-02-01 12:41:17Server -> read complete

收到了完整的10個對象:盡管客戶端一次批量發送了10條訂購請求消息,TCP會對消息進行拆包和黏包,但是並不影響最後的運行結果,服務端成功接收到了10條請求訂購的消息,與客戶端一致。

客戶端運行結果如下:

2017-02-01 12:41:17:client -> active
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:0 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:1 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:2 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:3 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:4 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:5 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:6 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:7 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:8 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]
2017-02-01 12:41:17:client -> read
2017-02-01 12:41:17:receive server response: { SubscribeReq: [messageID]:9 [respCode]:0 [description]:subscribe is success book will arrive after 3 days]

客戶端也收到了10條反饋信息。

本文完整代碼的github地址:
https://github.com/leetcode-hust/leetcode/tree/master/louyuting/src/netty/quanwei/p7


Tags: Google 優缺點 服務端 客戶端 Java

文章來源:


ads
ads

相關文章
ads

相關文章

ad