1. 程式人生 > >一起學Netty(十)之 Netty使用Google的ProtoBuf

一起學Netty(十)之 Netty使用Google的ProtoBuf

protobuf是由Google開發的一套對資料結構進行序列化的方法,可用做通訊協議,資料儲存格式,等等。其特點是不限語言、不限平臺、擴充套件性強

Netty也提供了對Protobuf的天然支援,我們今天就寫一個簡單的示例,簡單地瞭解一下Netty對Google的protoBuf的支援

我們的示例場景很簡單的:客戶端傳送一個資訊,這個資訊用Protobuf來做序列化,然後伺服器端接收這個資訊,解碼,讀取資訊

protobuf與xml,json這樣的資料格式一樣,都有自己的一套語法,且語法很簡單,很容易掌握,xml檔案的字尾名是xml,json的字尾名是json,以此類推,那麼protobuf的字尾名就是proto

關於proto的基本語法與java的bean很像,詳細可以參考官網,可以看下這篇部落格:

http://blog.sina.com.cn/s/blog_9b0604b40101qm35.html

現在我們定義一個類似java bean的proto檔案,我們定義一個“富人”類,他有多輛車,我們先按照語法,寫一個RichMan.proto,如下面的程式碼清單所示:

package netty;

option java_package = "com.lyncc.netty.codec.protobuf.demo";
option java_outer_classname = "RichManProto";

message RichMan {

   required int32 id = 1;
   required string name = 2;
   optional string email = 3;
   
   enum CarType {
     AUDI = 0;
     BENZ = 1;
     LAMBORGHINI = 2;
     DASAUTO = 3;
   }
   
   message Car {
      required string name = 1;
      optional CarType type = 2 [default = BENZ];
   }
   
   repeated Car cars = 4;
   
}
給出上面程式碼的一些基本解釋:

1)java_package值得是該檔案生成的java檔案的包路徑

2)java_outer_classname值的是生成的class的名稱

3)message和enum是它的基本型別,很類似於java的class和列舉

4)required表名這個欄位是必須的,option表明這個欄位可選,default表明這個欄位有預設值

5)repeat表明這個欄位可以重複,類似於java中的List,該例子中Car的宣告中,就相當於java中的List<Car>

6)每個宣告的後面的數字,例如1,2,3, 4等等,同級的宣告不能重複

總而言之,這個“類”定義了一個富人,該富人有id,名稱,郵箱,而且該富人有多個名車,這些名車的型別有奧迪,賓士,蘭博基尼,大眾

好了,到目前為止,proto我們已經定義好了,Google提供了一個類似指令碼的工具,可以使我們將proto檔案轉化成java檔案

該檔案叫做protoc-2.6.1-win32.zip,可以在很多地方下載到,下載地址:

http://download.csdn.net/detail/linuu/9515171

下載好,新建資料夾,且將載入的exe複製到該資料夾,且將我們剛才寫的RichMan.proto複製到該資料夾下:


進入命令列,鍵入:


沒有報錯的情況下,會在同樣的資料夾下生成如下的檔案:


進入com的資料夾,你會發現生成的目錄是與你proto中定義的java_package一樣:


好了,到目前為止,我們已經生成了RichManProto檔案了,將其複製到eclipse對應的目錄下,整個專案程式碼的縮圖如下圖所示:


新增maven的依賴:

<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>2.6.1</version>
</dependency>
注意就是版本必須是2.6.1,因為我們用的是protoc-2.6.1的exe去編譯的,所以版本必須保持一致,否則有可能會報錯

接下來就是一些大家耳熟能詳的server,handler,client,bootstrap,廢話多不說,上程式碼:

ProtoBufServer.java

package com.lyncc.netty.codec.protobuf.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class ProtoBufServer {
    
    public void bind(int port) throws Exception {
        // 配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            ch.pipeline().addLast(new ProtobufDecoder(RichManProto.RichMan.getDefaultInstance()));
                            ch.pipeline().addLast(new ProtoBufServerHandler());
                        }
                    });

            // 繫結埠,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            System.out.println("init start");
            // 等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放執行緒池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用預設值
            }
        }
        new ProtoBufServer().bind(port);
    }

}
可以看見,這個sever的bootstrap與其他的sever很相似,只是channelPipeline中在自定義的handler之前添加了netty對protobuf支援的兩個天然的decoder

我沒有深究,看名字就知道第一個Decoder是將幀byte資料轉化成message,第二步就是將message轉化成我們自定義的Rimanproto

自定義的handler很簡單,就是列印一下解析的內容:

ProtoBufServerHandler.java

package com.lyncc.netty.codec.protobuf.demo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.List;

import com.lyncc.netty.codec.protobuf.demo.RichManProto.RichMan.Car;

public class ProtoBufServerHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RichManProto.RichMan req = (RichManProto.RichMan) msg;
        System.out.println(req.getName()+"他有"+req.getCarsCount()+"量車");
        List<Car> lists = req.getCarsList();
        if(null != lists) {
            
            for(Car car : lists){
                System.out.println(car.getName());
            }
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close(); 
    }

}
客戶端程式碼

ProtoBufClient.java

package com.lyncc.netty.codec.protobuf.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class ProtoBufClient {
    
    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO執行緒組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufEncoder());
                            ch.pipeline().addLast(new ProtoBufClientHandler());
                        }
                    });

            // 發起非同步連線操作
            ChannelFuture f = b.connect(host, port).sync();

            // 當代客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用預設值
            }
        }
        new ProtoBufClient().connect(port, "127.0.0.1");
    }

}
需要注意的是:

在傳輸之前需要將你的類進行protobuf的序列化,這是兩個序列化的編碼器

接著看:

ProtoBufClientHandler.java

package com.lyncc.netty.codec.protobuf.demo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.ArrayList;
import java.util.List;

import com.lyncc.netty.codec.protobuf.demo.RichManProto.RichMan.Car;
import com.lyncc.netty.codec.protobuf.demo.RichManProto.RichMan.CarType;

public class ProtoBufClientHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("=======================================");
        RichManProto.RichMan.Builder builder = RichManProto.RichMan.newBuilder();
        builder.setName("王思聰");
        builder.setId(1);
        builder.setEmail("[email protected]");
        
        List<RichManProto.RichMan.Car> cars = new ArrayList<RichManProto.RichMan.Car>();
        Car car1 = RichManProto.RichMan.Car.newBuilder().setName("上海大眾超跑").setType(CarType.DASAUTO).build();
        Car car2 = RichManProto.RichMan.Car.newBuilder().setName("Aventador").setType(CarType.LAMBORGHINI).build();
        Car car3 = RichManProto.RichMan.Car.newBuilder().setName("賓士SLS級AMG").setType(CarType.BENZ).build();
        
        cars.add(car1);
        cars.add(car2);
        cars.add(car3);
        
        builder.addAllCars(cars);
        ctx.writeAndFlush(builder.build());
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

好了,到此為止,所有的程式碼已經寫完畢了,我們執行測試一下:

伺服器端啟動:

啟動客戶端後,再看看伺服器端的控制檯:


好了,到此為止,最簡單的demo已經搭建完畢了~