Google Protobuf在Netty中的使用
阿新 • • 發佈:2018-02-15
連接 delay exce gin bootstrap 語言 sync socket cau [toc]
Google Protobuf在Netty中的使用
程序代碼來自於《Netty權威指南》第8章,已經加了註釋,不過需要註意的是,使用的proto源代碼是在Google Protobuf入門與使用中生成的,關於protobuf代碼自動生成工具的使用可以參考這篇文章。
例子中,通過解碼器ProtobufVarint32FrameDecoder
和編碼器ProtobufVarint32LengthFieldPrepender
的使用已經解決了半包問題,測試時可以把其註釋掉,這樣就可以演示Netty中使用Protobuf出現的TCP粘包問題。
同時,通過protobuf的使用,也可以深刻感受到,其在Netty中的使用確實非常簡單,編解碼、半包問題,只需要添加相關的處理器即可,而且它可以方便地實現跨語言的遠程服務調用。(protobuf本身提供了對不同語言的支持)
但其實在使用時會發現有一個問題,就是編解碼的對象是需要使用其生成的特定的proto對象來進行操作的,也就是說,需要編寫.proto文件,再通過protoc來生成相應語言的代碼文件,顯然這樣做還是會有些麻煩(雖然其實也還好,不算麻煩),有沒有方便點的方法呢?後面通過protostuff的使用即可解決這個問題。
服務端
SubReqServer.java
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeReqProto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 添加日誌處理器 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用於Protobuf的半包處理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder解碼器,它的參數是com.google.protobuf.MessageLite // 實際上就是要告訴ProtobufDecoder需要解碼的目標類是什麽,否則僅僅從字節數組中是 // 無法判斷出要解碼的目標類型信息的(服務端需要解析的是客戶端請求,所以是Req) ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); /** * 來自源碼的代碼註釋,用於Protobuf的半包處理 * * An encoder that prepends the the Google Protocol Buffers * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base * 128 Varints</a> integer length field. For example: * <pre> * BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+ * </pre> * */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder編碼器,這樣就不需要對SubscribeResp進行手工編碼 ch.pipeline().addLast(new ProtobufEncoder()); // 添加業務處理handler ch.pipeline().addLast(new SubReqServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // TODO: handle exception } } new SubReqServer().bind(port); } }
SubReqServerHandler.java
package cn.xpleaf.subscribe; import cn.xpleaf.protobuf.SubscribeReqProto; import cn.xpleaf.protobuf.SubscribeRespProto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class SubReqServerHandler extends ChannelInboundHandlerAdapter { /** * 由於ProtobufDecoder已經對消息進行了自動解碼,因此接收到的訂購請求消息可以直接使用 * 對用戶名進行校驗,校驗通過後構造應答消息返回給客戶端,由於使用了ProtobufEncoder, * 所以不需要對SubscribeRespProto.SubscribeResp進行手工編碼 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg; String username = req.getUserName(); if("xpleaf".equalsIgnoreCase(username)) { System.out.println("Service accept client subscribe req : [" + req.toString() + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } /** * 構建SubscribeRespProto.SubscribeResp對象 * @param subReqID * @return */ private SubscribeRespProto.SubscribeResp resp(int subReqID) { SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 發生異常,關閉鏈路 ctx.close(); } }
客戶端
SubReqClient.java
package cn.xpleaf.subscribe;
import cn.xpleaf.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class SubReqClient {
public void connect(String host, int port) throws Exception {
// 配置客戶端NIO線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// 設置TCP連接超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加ProtobufVarint32FrameDecoder,主要用於Protobuf的半包處理
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
// 添加ProtobufDecoder解碼器,它的參數是com.google.protobuf.MessageLite
// 實際上就是要告訴ProtobufDecoder需要解碼的目標類是什麽,否則僅僅從字節數組中是
// 無法判斷出要解碼的目標類型信息的(客戶端需要解析的是服務端請求,所以是Resp)
ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
/**
* 來自源碼的代碼註釋,用於Protobuf的半包處理
* * An encoder that prepends the the Google Protocol Buffers
* <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base
* 128 Varints</a> integer length field. For example:
* <pre>
* BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
* +---------------+ +--------+---------------+
* | Protobuf Data |-------------->| Length | Protobuf Data |
* | (300 bytes) | | 0xAC02 | (300 bytes) |
* +---------------+ +--------+---------------+
* </pre> *
*/
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
// 添加ProtobufEncoder編碼器,這樣就不需要對SubscribeResp進行手工編碼
ch.pipeline().addLast(new ProtobufEncoder());
// 添加業務處理handler
ch.pipeline().addLast(new SubReqClientHandler());
}
});
// 發起異步連接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放NIO線程組
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// 采用默認值
}
}
new SubReqClient().connect("localhost", port);
}
}
SubReqClientHandler.java
package cn.xpleaf.subscribe;
import java.util.ArrayList;
import java.util.List;
import cn.xpleaf.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class SubReqClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
for(int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
/**
* 構建SubscribeReqProto.SubscribeReq對象
* @param i
* @return
*/
private SubscribeReqProto.SubscribeReq subReq(int i) {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(i);
builder.setUserName("xpleaf");
builder.setProductName("Netty Book For Protobuf");
List<String> address = new ArrayList<>();
address.add("NanJing YuHuaTai");
address.add("BeiJing LiuLiChange");
address.add("ShenZhen HongShuLin");
builder.addAllAddress(address);
return builder.build();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Service accept server subscribe response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
測試
服務端輸出如下:
Service accept client subscribe req : [subReqID: 0
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 1
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 2
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 3
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 4
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 5
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 6
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 7
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 8
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 9
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
客戶端輸出如下:
Service accept server subscribe response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 1
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 2
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 3
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 4
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 5
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 6
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 7
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 8
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Google Protobuf在Netty中的使用