1. 程式人生 > >netty傳輸物件使用protostuff實現序列化操作

netty傳輸物件使用protostuff實現序列化操作

序列化

序列化即是將java物件轉為二進位制資料流,在網路中的資料傳輸就要實現資料的序列化和反序列化。

實現序列化可以使用JDK自帶的方式:實現Serializable介面即可,操作很簡單。
但是這種方式的確定就是效率很低。
所以,這時我們可以使用一些第三方的序列化方式提高效率,這裡使用protostuff

首先匯入依賴

<dependency>
     <groupId>com.google.protobuf</groupId>
     <artifactId>protobuf-java</artifactId>
     <version>2.5.0</version>
 </dependency>

實體

public class Person {
    String name;
    int id;
    String email;
}

proto描述檔案的編寫

語法
3.5的語法跟以前的protobuf語法有很大的變化,詳細的可以參考官方文件,
地址:https://developers.google.com/protocol-buffers/docs/proto3
這裡有一箇中文翻譯的文件
地址:http://blog.csdn.net/u011518120/article/details/54604615

重點注意的地方有這麼幾點:句法申明、資料型別

客戶端

public class Client {

    public static class ProtoBufClient {
        public void connect(int port, String host) throws Exception {
            // 配置客戶端NIO執行緒組
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {

                                // 用來新增報文長度欄位
                                ch.pipeline().addLast(
                                        new ProtobufVarint32LengthFieldPrepender());

                                //新增 ProtobufEncoder 進行序列化將實體類編碼為位元組
                                ch.pipeline().addLast(new ProtobufEncoder());

                                //新增自己的業務Handler
                                ch.pipeline().addLast(
                                        new ProtoBufClientHandler());
                            }
                        });

                ChannelFuture f = b.connect(host, port).sync();
                f.channel().closeFuture().sync();
            } finally {
                // 優雅退出,釋放NIO執行緒組
                group.shutdownGracefully();
            }
        }

        public static void main(String[] args) throws Exception {
            int port = 8080;
            new ProtoBufClient().connect(port, "127.0.0.1");
        }
    }


}

客戶端處理器

public class ProtoBufClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("準備生成資料===========>");

        //生成實體類
        PersonProto.Person.Builder builder = PersonProto.Person.newBuilder();
        builder.setName("CemB");
        builder.setId(1);
        builder.setEmail("
[email protected]
"); System.out.println("傳送資料===========>" + builder.getName()); //寫往服務端,由編碼器進行編碼 ctx.writeAndFlush(builder.build()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }

服務端

public class ProtoBufServer {
    public void bind(int port) throws Exception {
        // 配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //初始化服務端可連線佇列為100個
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {

                            //新增 ProtobufVarint32FrameDecoder 以分離資料幀
                            ch.pipeline().addLast(
                                    new ProtobufVarint32FrameDecoder());

                            //新增 ProtobufDecoder 反序列化將位元組解碼為實體
                            ch.pipeline().addLast(new ProtobufDecoder(
                                    PersonProto.Person.getDefaultInstance()
                            ));

                            //新增自己業務Handler
                            ch.pipeline().addLast(new ProtoBufServerHandler());
                        }
                    });

            // 繫結埠,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            System.out.println("init start");
            // 等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放執行緒池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new ProtoBufServer().bind(port);
    }
}

服務端處理器

public class ProtoBufServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //將上一個handler傳入的資料強制轉型,因為已經反序列化了
        PersonProto.Person req = (PersonProto.Person) msg;
        System.out.println("收到資料:name=" + req.getName() + ",email=" + req.getEmail());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof IOException) {
            System.out.println("遠端客戶端強迫關閉了一個現有的連線。");
        }
        ctx.close();
    }
}