1. 程式人生 > >高效能NIO框架Netty-整合Protobuf高效能資料傳輸

高效能NIO框架Netty-整合Protobuf高效能資料傳輸

前言

上篇文章我們整合了kryo來進行資料的傳輸編解碼,今天將繼續學習使用Protobuf來編解碼。Netty對Protobuf的支援比較好,還提供了Protobuf的編解碼器,非常方便。

Protobuf介紹

Protobuf是google開源的專案,全稱 Google Protocol Buffers,特點如下:
- 支援跨平臺多語言,支援目前絕大多數語言例如C++、C#、Java、pthyon等
- 高效能,可靠性高,google出品有保障
- 使用protobuf編譯器能自動生成程式碼,但需要編寫proto檔案,需要一點學習成本

Protobuf使用

Protobuf是將類的定義使用.proto檔案進行描述,然後通過protoc.exe編譯器,根據.proto自動生成.java檔案,然後將生成的.java檔案拷貝到專案中使用即可。

protoc.exe編譯器下載

下載完成之後放到磁碟上進行解壓,可以將protoc.exe配置到環境變數中去,這樣就可以直接在cmd命令列中使用protoc命令,也可以不用配置,直接到解壓後的protoc\bin目錄下進行檔案的編譯。

下面我們基於之前的Message物件來構建一個Message.proto檔案。

syntax = "proto3";
option java_outer_classname = "MessageProto";
message Message {  
  string id = 1;
  string content = 2;
}

syntax 宣告可以選擇protobuf的編譯器版本(v2和v3)


- syntax=”proto2”;選擇2版本
- syntax=”proto3”;選擇3版本

option java_outer_classname=”MessageProto”用來指定生成的java類的類名。
message相當於c語言中的struct語句,表示定義一個資訊,其實也就是類。
message裡面的資訊就是我們要傳輸的欄位了,子段後面需要有一個數字編號,從1開始遞增

.proto檔案定好之後就可以用編譯器進行編譯,輸出我們要使用的Java類,我們這邊不配置環境變數,直接到解壓包的bin目錄下進行操作

首先將我們的Message.proto檔案複製到bin目錄下,然後在這個目錄下開啟CMD視窗,輸入下面的命令進行編譯操作:

protoc ./Message.proto --java_out=./

–java_out是輸出目錄,我們就輸出到當前目錄下,執行完之後可以看到bin目錄下多了一個MessageProto.java檔案,把這個檔案複製到專案中使用即可。

Nettty整合Protobuf

首先加入Protobuf的Maven依賴

<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
   <groupId>com.google.protobuf</groupId>
   <artifactId>protobuf-java</artifactId>
   <version>3.5.1</version>
</dependency>

建立一個Proto的Server資料處理類,之前的已經不能用了,因為現在傳輸的物件是MessageProto這個物件了

public class ServerPoHandlerProto extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        MessageProto.Message message = (MessageProto.Message) msg;
        if (ConnectionPool.getChannel(message.getId()) == null) {
            ConnectionPool.putChannel(message.getId(), ctx);
        }
        System.err.println("server:" + message.getId());
        ctx.writeAndFlush(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

改造服務端啟動程式碼,增加protobuf編解碼器,是Netty自帶的,不用我們去自定義了

public class ImServer {

    public void run(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() { 
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        // 實體類傳輸資料,protobuf序列化
                        ch.pipeline().addLast("decoder",  
                                new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));  
                        ch.pipeline().addLast("encoder",  
                                new ProtobufEncoder());  
                        ch.pipeline().addLast(new ServerPoHandlerProto());

                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        try {
            ChannelFuture f = bootstrap.bind(port).sync();
             f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

服務端改造完了,下面需要把客戶的的Handler和編解碼器也改成protobuf的就行了,廢話不多說,直接上程式碼

public class ImConnection {

    private Channel channel;

    public Channel connect(String host, int port) {
        doConnect(host, port);
        return this.channel;
    }

    private void doConnect(String host, int port) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 實體類傳輸資料,protobuf序列化
                    ch.pipeline().addLast("decoder",  
                            new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));  
                    ch.pipeline().addLast("encoder",  
                            new ProtobufEncoder());  
                    ch.pipeline().addLast(new ClientPoHandlerProto());

                }
            });

            ChannelFuture f = b.connect(host, port).sync();
            channel = f.channel();
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

}

客戶的資料處理類:

public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        MessageProto.Message message = (MessageProto.Message) msg;
        System.out.println("client:" + message.getContent());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

最後一步就開始測試了,需要將客戶的傳送訊息的地方改成MessageProto.Message物件,程式碼如下:

/**
 * IM 客戶端啟動入口
 * @author yinjihuan
 */
public class ImClientApp {
    public static void main(String[] args) {
        String host = "127.0.0.1";
        int port = 2222;
        Channel channel = new ImConnection().connect(host, port);
        String id = UUID.randomUUID().toString().replaceAll("-", "");
        // protobuf
        MessageProto.Message message = MessageProto.Message.newBuilder().setId(id).setContent("hello yinjihuan").build();
        channel.writeAndFlush(message);
    }
}

更多技術分享請關注微信公眾號:猿天地
猿天地微信公眾號