1. 程式人生 > >五、通過Protobuf整合Netty實現對協議訊息客戶端與伺服器通訊實戰

五、通過Protobuf整合Netty實現對協議訊息客戶端與伺服器通訊實戰

目錄

一、Protocol Buffers 是什麼?

二、Protocol Buffers 檔案和訊息詳解

三、專案實戰,直接掌握的protobuf應用。


一、Protocol Buffers 是什麼?

        1、官網翻譯之後如下:

               Protocol Buffers 是一種輕便高效的結構化資料儲存格式,可以用於結構化資料序列化,或者說序列化。它很適合做資料儲存或 RPC 資料交換格式。可用於通訊協議、資料儲存等領域的語言無關、平臺無關、可擴充套件的序列化結構資料格式。目前提供了 C++、Java、Python 三種語言的 API。

        2、RPC:Remote Procedure Call ,遠端過程呼叫,很多RPC框架是跨語言的。
             (1)、定義一個介面檔案:描述了物件(結構體)、物件成員,介面方法等一系列資訊。
             (2)、通過RPC框架所提供的編譯器,將介面說明檔案編譯成具體的語言檔案。
             (3)、在客戶端與伺服器端分別引入RPC編譯器所生產的檔案,即可像呼叫本地方法一樣呼叫遠端方法。

二、Protocol Buffers 檔案和訊息詳解

     1、安裝解壓配置環境,就直接省略了,太簡單了。

      2、my_example.proto 配置檔案詳解。 

syntax = "proto2";
//包名
package tutorial; 
//以下面包名為主,可以省略,如果省略以tutoria1為包名
option java_package = "com.example.tutorial";
//生成一個類名字,是public 的外部類的名字,如果不定義則會預設會取檔名字
//以駝峰的形式顯示出來MyExample作為類名。
option java_outer_classname = "AddressBookProtos";

//下面的類都是java_outer_classname的內部類
message Person {
  required string name = 1;
  required int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    required string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}

required:
必須提供欄位的值,否則訊息將被視為“未初始化”。嘗試構建一個未初始化的訊息將拋
出一個RuntimeException。解析未初始化的訊息將丟擲IOException。除此之外,
所需的欄位與可選欄位完全一樣。
ps:谷歌的一些工程師已經得出結論,使用required弊大於利;他們喜歡只使用可選的和重複的。
然而,這種觀點並不普遍。

optional:
該欄位可能也可能不設定。如果未設定可選欄位值,則使用預設值。對於簡單型別來說,
您可以指定自己的預設值,就像我們在例子中為電話號碼型別所做的那樣。否則,就會
使用系統預設值:數值型別為0,字串為空字串,bools為false。對於嵌入式訊息
來說,預設值始終是訊息的“預設例項”或“prototype”,它的欄位沒有設定。呼叫
accessor來獲取未顯式設定的可選(或必需)欄位的值總是返回該欄位的預設值。

repeated:
該欄位可以多次重複(包括零)。重複值的順序將保留在協議緩衝區中。把重複的欄位
看作是動態大小的陣列.

 

編譯.proto檔案
protoc --java_out= src/main/java(輸出路徑)  src/protobuf/TransData.proto(目標檔案)

 

三、專案實戰,直接掌握的protobuf應用。

專案總覽:

syntax = "proto2";

package com.zhurong.protobuf;

option optimize_for = SPEED;
option java_package = "com.zhurong.protobuf";
option java_outer_classname = "TransData";

message MessageIndex{
    enum MessageType{
        MessageBodyPartIndex1 = 1;
        MessageBodyPartIndex2 = 2;
        MessageBodyPartIndex3 = 3;
    }
    required MessageType message_type = 1;
    oneof messageBody{
        MessageBodyPart1 messageBodyPart1 = 2;
        MessageBodyPart2 messageBodyPart2 = 3;
        MessageBodyPart3 messageBodyPart3 = 4;
    }
}
message MessageBodyPart1 {
    required string name = 1;
    required int32 id = 2;
    optional string email = 3;
 }

message MessageBodyPart2 {
    required string name = 1;
    required int32 id = 2;
    optional string email = 3;
}

message MessageBodyPart3 {
    required string name = 1;
    required int32 id = 2;
    optional string email = 3;
}

       

生成java類如下:

// Generated by the protocol buffer compiler.  DO NOT EDIT!
// source: src/protobuf/TransData.proto

package com.zhurong.protobuf.message;

public final class TransData {
  private TransData() {}
  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistryLite registry) {
  }

  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistry registry) {
    registerAllExtensions(
        (com.google.protobuf.ExtensionRegistryLite) registry);
  }
  public interface MessageIndexOrBuilder extends
      // @@protoc_insertion_point(interface_extends:com.zhurong.protobuf.MessageIndex)
      com.google.protobuf.MessageOrBuilder {

    /**
     * <code>required .com.zhurong.protobuf.MessageIndex.MessageType message_type = 1;</code>
     */
    boolean hasMessageType();
    /**
     * <code>required .com.zhurong.protobuf.MessageIndex.MessageType message_type = 1;</code>
     */
    TransData.MessageIndex.MessageType getMessageType();

    /**
     * <code>optional .com.zhurong.protobuf.MessageBodyPart1 messageBodyPart1 = 2;</code>
     */
    boolean hasMessageBodyPart1();
    /**
     * <code>optional .com.zhurong.protobuf.MessageBodyPart1 messageBodyPart1 = 2;</code>
     */
    TransData.MessageBodyPart1 getMessageBodyPart1();
    /**
     * <code>optional .com.zhurong.protobuf.MessageBodyPart1 messageBodyPart1 = 2;</code>
     */
    TransData.MessageBodyPart1OrBuilder getMessageBodyPart1OrBuilder();

  // @@protoc_insertion_point(outer_class_scope)

    .. .. ..  .  . . . . . .
    .. .. ..  .  . . . . . .
    .. .. ..  .  . . . . . .
    .. .. ..  .  . . . . . .
    .. .. ..  .  . . . . . .
       <省略>
}

伺服器端程式碼:

package com.zhurong.protobuf.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Description:
 * User: zhurong
 * Date: 2018-09-24  23:17
 */
public class NettyServerProtoBuf {

    public static void main(String[] args) {
        //接收連線
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //連線傳送給work
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            System.out.println("伺服器啟動成功!");
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
                    handler(new LoggingHandler(LogLevel.INFO)).
                    childHandler(new NettyServerProtobufInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}

package com.zhurong.protobuf.server;

import com.zhurong.protobuf.message.TransData;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * Description:
 * User: zhurong
 * Date: 2018-10-14  9:53
 */
public class NettyServerProtobufInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast(new ProtobufVarint32FrameDecoder());
        channelPipeline.addLast(new ProtobufDecoder(TransData.MessageIndex.getDefaultInstance()));
        channelPipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        channelPipeline.addLast(new ProtobufEncoder());
        channelPipeline.addLast(new NettyServerProtoBufHandler());
    }
}

package com.zhurong.protobuf.server;

import com.zhurong.protobuf.message.TransData;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.EventExecutorGroup;

/**
 * Description:
 * User: zhurong
 * Date: 2018-10-14  9:59
 */
public class NettyServerProtoBufHandler extends SimpleChannelInboundHandler<TransData.MessageIndex> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TransData.MessageIndex msg) throws Exception {
        if((TransData.MessageIndex.MessageType)msg.getMessageType()
                == TransData.MessageIndex.MessageType.MessageBodyPartIndex1){
            System.out.println(msg.getMessageBodyPart1().getId());
            System.out.println(msg.getMessageBodyPart1().getName());
            System.out.println(msg.getMessageBodyPart1().getEmail());
        }

    }
}

 

客戶端程式碼:

package com.zhurong.protobuf.server;

import com.zhurong.protobuf.message.TransData;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.EventExecutorGroup;

/**
 * Description:
 * User: zhurong
 * Date: 2018-10-14  9:59
 */
public class NettyServerProtoBufHandler extends SimpleChannelInboundHandler<TransData.MessageIndex> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TransData.MessageIndex msg) throws Exception {
        if((TransData.MessageIndex.MessageType)msg.getMessageType()
                == TransData.MessageIndex.MessageType.MessageBodyPartIndex1){
            System.out.println(msg.getMessageBodyPart1().getId());
            System.out.println(msg.getMessageBodyPart1().getName());
            System.out.println(msg.getMessageBodyPart1().getEmail());
        }

    }
}


package com.zhurong.protobuf.client;

import com.zhurong.protobuf.message.TransData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Description:
 * User: zhurong
 * Date: 2018-10-14  20:53
 */
public class NettyClientProtoBufHandler extends SimpleChannelInboundHandler<TransData.MessageIndex>{

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TransData.MessageIndex msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        TransData.MessageIndex messageIndex = TransData.MessageIndex.newBuilder()
                .setMessageType(TransData.MessageIndex.MessageType.MessageBodyPartIndex1)
                .setMessageBodyPart1(TransData.MessageBodyPart1.newBuilder()
                .setId(11111).setEmail("23242343255").setName("jim").build())
                .build();

        ctx.channel().writeAndFlush(messageIndex);
    }
}

package com.zhurong.protobuf.client;

import com.zhurong.protobuf.message.TransData;
import com.zhurong.protobuf.server.NettyServerProtoBufHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 *  Netstat –ano|findstr
 * Description: 客戶端與伺服器端連線一旦建立,這個類中方法就會被回撥
 * User: zhurong
 * Date: 2018-09-24  21:29
 */
public class NettyClientProtoBufInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast(new ProtobufVarint32FrameDecoder());
        channelPipeline.addLast(new ProtobufDecoder(TransData.MessageIndex.getDefaultInstance()));
        channelPipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        channelPipeline.addLast(new ProtobufEncoder());
        channelPipeline.addLast(new NettyClientProtoBufHandler());
    }
}

多協議訊息解決方法就是在訊息最前面加一個標識,告訴伺服器這個哪一個訊息體,當然這個是屬於自定義編解碼格式。

上面我只是寫了一個分支,完整的是後面2個body訊息體,都應該加上去放在else if裡面。此處就不過多講解了。到這裡protobuf使用,應該是掌握了吧!