1. 程式人生 > >Google Protobuf在Netty中的使用

Google Protobuf在Netty中的使用

連接 delay exce gin bootstrap 語言 sync socket cau

[toc]


Google Protobuf在Netty中的使用

程序代碼來自於《Netty權威指南》第8章,已經加了註釋,不過需要註意的是,使用的proto源代碼是在Google Protobuf入門與使用中生成的,關於protobuf代碼自動生成工具的使用可以參考這篇文章。

例子中,通過解碼器ProtobufVarint32FrameDecoder和編碼器ProtobufVarint32LengthFieldPrepender的使用已經解決了半包問題,測試時可以把其註釋掉,這樣就可以演示Netty中使用Protobuf出現的TCP粘包問題。

同時,通過protobuf的使用,也可以深刻感受到,其在Netty中的使用確實非常簡單,編解碼、半包問題,只需要添加相關的處理器即可,而且它可以方便地實現跨語言的遠程服務調用。(protobuf本身提供了對不同語言的支持)

但其實在使用時會發現有一個問題,就是編解碼的對象是需要使用其生成的特定的proto對象來進行操作的,也就是說,需要編寫.proto文件,再通過protoc來生成相應語言的代碼文件,顯然這樣做還是會有些麻煩(雖然其實也還好,不算麻煩),有沒有方便點的方法呢?後面通過protostuff的使用即可解決這個問題。

服務端

SubReqServer.java

package cn.xpleaf.subscribe;

import cn.xpleaf.protobuf.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqServer {
    public void bind(int port) throws Exception {
        // 配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 添加日誌處理器
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 添加ProtobufVarint32FrameDecoder,主要用於Protobuf的半包處理
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        // 添加ProtobufDecoder解碼器,它的參數是com.google.protobuf.MessageLite
                        // 實際上就是要告訴ProtobufDecoder需要解碼的目標類是什麽,否則僅僅從字節數組中是
                        // 無法判斷出要解碼的目標類型信息的(服務端需要解析的是客戶端請求,所以是Req)
                        ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                        /**
                         * 來自源碼的代碼註釋,用於Protobuf的半包處理
                         * * An encoder that prepends the the Google Protocol Buffers
                         * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base
                         * 128 Varints</a> integer length field. For example:
                         * <pre>
                         * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)
                         * +---------------+               +--------+---------------+
                         * | Protobuf Data |-------------->| Length | Protobuf Data |
                         * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
                         * +---------------+               +--------+---------------+
                         * </pre> *
                         */
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        // 添加ProtobufEncoder編碼器,這樣就不需要對SubscribeResp進行手工編碼
                        ch.pipeline().addLast(new ProtobufEncoder());
                        // 添加業務處理handler
                        ch.pipeline().addLast(new SubReqServerHandler());
                    }
                });

            // 綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if(args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (NumberFormatException e) {
                // TODO: handle exception
            }
        }
        new SubReqServer().bind(port);
    }
}

SubReqServerHandler.java

package cn.xpleaf.subscribe;

import cn.xpleaf.protobuf.SubscribeReqProto;
import cn.xpleaf.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SubReqServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 由於ProtobufDecoder已經對消息進行了自動解碼,因此接收到的訂購請求消息可以直接使用
     * 對用戶名進行校驗,校驗通過後構造應答消息返回給客戶端,由於使用了ProtobufEncoder,
     * 所以不需要對SubscribeRespProto.SubscribeResp進行手工編碼
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;
        String username = req.getUserName();
        if("xpleaf".equalsIgnoreCase(username)) {
            System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
            ctx.writeAndFlush(resp(req.getSubReqID()));
        }
    }

    /**
     * 構建SubscribeRespProto.SubscribeResp對象
     * @param subReqID
     * @return
     */
    private SubscribeRespProto.SubscribeResp resp(int subReqID) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqID(subReqID);
        builder.setRespCode(0);
        builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
        return builder.build();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 發生異常,關閉鏈路
        ctx.close();
    }
}

客戶端

SubReqClient.java

package cn.xpleaf.subscribe;

import cn.xpleaf.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqClient {
    public void connect(String host, int port) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                // 設置TCP連接超時時間
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 添加ProtobufVarint32FrameDecoder,主要用於Protobuf的半包處理
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        // 添加ProtobufDecoder解碼器,它的參數是com.google.protobuf.MessageLite
                        // 實際上就是要告訴ProtobufDecoder需要解碼的目標類是什麽,否則僅僅從字節數組中是
                        // 無法判斷出要解碼的目標類型信息的(客戶端需要解析的是服務端請求,所以是Resp)
                        ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                        /**
                         * 來自源碼的代碼註釋,用於Protobuf的半包處理
                         * * An encoder that prepends the the Google Protocol Buffers
                         * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base
                         * 128 Varints</a> integer length field. For example:
                         * <pre>
                         * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)
                         * +---------------+               +--------+---------------+
                         * | Protobuf Data |-------------->| Length | Protobuf Data |
                         * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
                         * +---------------+               +--------+---------------+
                         * </pre> *
                         */
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        // 添加ProtobufEncoder編碼器,這樣就不需要對SubscribeResp進行手工編碼
                        ch.pipeline().addLast(new ProtobufEncoder());
                        // 添加業務處理handler
                        ch.pipeline().addLast(new SubReqClientHandler());
                    }
                });
            // 發起異步連接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if(args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (NumberFormatException e) {
                // 采用默認值
            }
        }
        new SubReqClient().connect("localhost", port);
    }
}

SubReqClientHandler.java

package cn.xpleaf.subscribe;

import java.util.ArrayList;
import java.util.List;

import cn.xpleaf.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SubReqClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for(int i = 0; i < 10; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    /**
     * 構建SubscribeReqProto.SubscribeReq對象
     * @param i
     * @return
     */
    private SubscribeReqProto.SubscribeReq subReq(int i) {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("xpleaf");
        builder.setProductName("Netty Book For Protobuf");
        List<String> address = new ArrayList<>();
        address.add("NanJing YuHuaTai");
        address.add("BeiJing LiuLiChange");
        address.add("ShenZhen HongShuLin");
        builder.addAllAddress(address);
        return builder.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Service accept server subscribe response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

測試

服務端輸出如下:

Service accept client subscribe req : [subReqID: 0
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 1
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 2
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 3
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 4
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 5
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 6
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 7
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 8
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 9
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]

客戶端輸出如下:

Service accept server subscribe response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 1
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 2
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 3
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 4
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 5
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 6
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 7
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 8
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]

Google Protobuf在Netty中的使用