【Netty入門】解決TCP粘包/分包的例項
回顧TCP粘包/分包問題的解決方法
1.訊息定長
2.在包尾都增加特殊字元進行分割
3.將訊息分為訊息頭和訊息體
針對這三種方法,下面我會分別舉例驗證
FixedLengthFrameDecoder類
對應第一種解決方法:訊息定長
(1)例1:服務端程式碼:
public class Server4 {
public static void main(String[] args) throws SigarException {
//boss執行緒監聽埠,worker執行緒負責資料讀寫
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try{
//輔助啟動類
ServerBootstrap bootstrap = new ServerBootstrap();
//設定執行緒池
bootstrap.group(boss,worker);
//設定socket工廠
bootstrap.channel(NioServerSocketChannel.class);
//設定管道工廠
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//獲取管道
ChannelPipeline pipeline = socketChannel.pipeline();
//定長解碼類
pipeline.addLast(new FixedLengthFrameDecoder(19));
//字串解碼類
pipeline.addLast(new StringDecoder());
//處理類
pipeline.addLast(new ServerHandler4());
}
});
//繫結埠
ChannelFuture future = bootstrap.bind(8866).sync();
System.out.println("server start ...... ");
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//優雅退出,釋放執行緒池資源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
class ServerHandler4 extends SimpleChannelInboundHandler <String>{
//用於記錄次數
private int count = 0;
//讀取客戶端傳送的資料
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("RESPONSE--------"+msg+";"+" @ "+ ++count);
}
//新客戶端接入
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
//客戶端斷開
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
//異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉通道
ctx.channel().close();
//列印異常
cause.printStackTrace();
}
}
(2)例1:客戶端程式碼:
public class Client4 {
public static void main(String[] args) {
//worker負責讀寫資料
EventLoopGroup worker = new NioEventLoopGroup();
try {
//輔助啟動類
Bootstrap bootstrap = new Bootstrap();
//設定執行緒池
bootstrap.group(worker);
//設定socket工廠
bootstrap.channel(NioSocketChannel.class);
//設定管道
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//獲取管道
ChannelPipeline pipeline = socketChannel.pipeline();
//定長解碼類
pipeline.addLast(new FixedLengthFrameDecoder(19));
//字串編碼類
pipeline.addLast(new StringEncoder());
//處理類
pipeline.addLast(new ClientHandler4());
}
});
//發起非同步連線操作
ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync();
//等待客戶端鏈路關閉
futrue.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//優雅的退出,釋放NIO執行緒組
worker.shutdownGracefully();
}
}
}
class ClientHandler4 extends SimpleChannelInboundHandler<String> {
//接受服務端發來的訊息
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server response : "+msg);
}
//與伺服器建立連線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//給伺服器發訊息
String s = System.getProperty("line.separator");
//傳送50次訊息
for (int i = 0; i < 50; i++) {
ctx.channel().writeAndFlush(" I am client "+s);
}
}
//與伺服器斷開連線
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
//異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉管道
ctx.channel().close();
//列印異常資訊
cause.printStackTrace();
}
}
例1服務端執行結果:
…………………….此處省略多行………………….
分析:從執行結果可以看出,符合我們的預期,並沒有TCP粘包問題,這是因為使用的定長解碼器的原因,我在此解釋一下例1client/server程式碼中新增加了幾個“陌生”的類,若之後再次出現,則不作解釋!
- FixedLengthFrameDecoder類:用於固定長度訊息的粘包分包處理,可以攜帶引數,我在程式碼中指定的引數為19,因為我要傳送的字元長度為19。
- StringDecoder類 :用於字串的解碼。
- StringEncoder類 :用於字串的編碼。
LineBasedFrameDecoder類
對應第二種解決方法:在包尾都增加特殊字元(行分隔符)進行分割
(1)例2:服務端程式碼:
public class Server4 {
public static void main(String[] args) throws SigarException {
//boss執行緒監聽埠,worker執行緒負責資料讀寫
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try{
//輔助啟動類
ServerBootstrap bootstrap = new ServerBootstrap();
//設定執行緒池
bootstrap.group(boss,worker);
//設定socket工廠
bootstrap.channel(NioServerSocketChannel.class);
//設定管道工廠
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//獲取管道
ChannelPipeline pipeline = socketChannel.pipeline();
//行分隔符解碼類
pipeline.addLast(new LineBasedFrameDecoder(1024));
//字串解碼類
pipeline.addLast(new StringDecoder());
//處理類
pipeline.addLast(new ServerHandler4());
}
});
//繫結埠
ChannelFuture future = bootstrap.bind(8866).sync();
System.out.println("server start ...... ");
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//優雅退出,釋放執行緒池資源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
class ServerHandler4 extends SimpleChannelInboundHandler <String>{
//用於記錄次數
private int count = 0;
//讀取客戶端傳送的資料
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("RESPONSE--------"+msg+";"+" @ "+ ++count);
}
//新客戶端接入
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
//客戶端斷開
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
//異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉通道
ctx.channel().close();
//列印異常
cause.printStackTrace();
}
}
(2)例2:客戶端程式碼:
public class Client4 {
public static void main(String[] args) {
//worker負責讀寫資料
EventLoopGroup worker = new NioEventLoopGroup();
try {
//輔助啟動類
Bootstrap bootstrap = new Bootstrap();
//設定執行緒池
bootstrap.group(worker);
//設定socket工廠
bootstrap.channel(NioSocketChannel.class);
//設定管道
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//獲取管道
ChannelPipeline pipeline = socketChannel.pipeline();
//行分隔符解碼類
pipeline.addLast(new LineBasedFrameDecoder(1024));
//字串編碼類
pipeline.addLast(new StringEncoder());
//處理類
pipeline.addLast(new ClientHandler4());
}
});
//發起非同步連線操作
ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync();
//等待客戶端鏈路關閉
futrue.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//優雅的退出,釋放NIO執行緒組
worker.shutdownGracefully();
}
}
}
class ClientHandler4 extends SimpleChannelInboundHandler<String> {
//接受服務端發來的訊息
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server response : "+msg);
}
//與伺服器建立連線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//給伺服器發訊息
String s = System.getProperty("line.separator");
//傳送50次訊息
for (int i = 0; i < 50; i++) {
ctx.channel().writeAndFlush(" I am client "+s);
}
}
//與伺服器斷開連線
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
//異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉管道
ctx.channel().close();
//列印異常資訊
cause.printStackTrace();
}
}
例2服務端執行結果:
…………………….此處省略多行………………….
分析:從執行結果可以看出沒有TCP粘包問題了,細心的你或許已經發現程式碼中新出現了一個LineBasedFrameDecoder類,它可以攜帶引數,我指定的引數為1024,含義為在每1024個位元組中尋找換行符,若有,就傳送訊息,否則繼續尋找。
DelimiterBasedFrameDecoder類
對應第二種解決方法:在包尾都增加特殊字元(#)進行分割
例3:服務端程式碼,和例2服務端程式碼類似,由於篇幅有限,我就僅僅指出它們不一樣的地方了!
將例2服務端 第23行 和第24行 程式碼修改為
//自定義分隔符解碼類
pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));
例3:客戶端程式碼:(和例2客戶端程式碼的不同之處)
將例2客戶端 第24行 和第25行 程式碼修改為
//自定義分隔符解碼類
pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));
再將例2客戶端 第63行 程式碼修改為
ctx.channel().writeAndFlush(" I am client "+"#");
例3服務端執行結果:
…………………….此處省略多行………………….
分析:從執行結果可以看出TCP粘包問題解決了!程式碼中新出現了一個DelimiterBasedFrameDecoder類,它可以攜帶引數,我指定的引數為(1024,Unpooled.copiedBuffer(“#”.getBytes()))),含義為在每1024個位元組中尋找#,若有,就傳送訊息,否則繼續尋找。
MyRequestDecoder自定義類
對應第三種方法:將訊息分為訊息頭和訊息體
對於訊息頭和訊息體,有三種情況分別如下:
有頭部的拆包與粘包:
lengthFieldOffset = 2 長度欄位偏移量 ( = 外部頭部Header 1的長度)
lengthFieldLength = 3 長度欄位佔用位元組數
lengthAdjustment = 0
initialBytesToStrip = 0
長度欄位在前且有頭部的拆包與粘包:
lengthFieldOffset = 0 長度欄位偏移量
lengthFieldLength = 3 長度欄位佔用位元組數
lengthAdjustment = 2 ( Header 1 的長度)
initialBytesToStrip = 0
- 多擴充套件頭部的拆包與粘包:
lengthFieldOffset = 1 長度欄位偏移量(=頭HDR1的長度)
lengthFieldLength = 2 長度欄位佔用位元組數
lengthAdjustment = 1 調整長度(= 頭HDR2的長度)
initialBytesToStrip = 3 排除的偏移量(= the length of HDR1 + LEN)
舉一個簡單的例子
例4:
import netty.EnDecode.Request;
/**
* 請求解碼器
* <pre>
* 資料包格式
* +——----——+——-----——+——----——+——----——+——-----——+
* | 包頭 | 模組號 | 命令號 | 長度 | 資料 |
* +——----——+——-----——+——----——+——----——+——-----——+
* </pre>
* 包頭4位元組
* 模組號2位元組short
* 命令號2位元組short
* 長度4位元組(描述資料部分位元組長度)
*/
public class MyRequestDecoder extends FrameDecoder{
//資料包基本長度
public static final int BASE_LENTH = 4 + 2 + 2 + 4;
@Override
protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
//可讀長度必須大於基本長度
if(buffer.readableBytes() >= BASE_LENTH){
//防止socket位元組流攻擊
if(buffer.readableBytes() > 2048){
buffer.skipBytes(buffer.readableBytes());
}
//記錄包頭開始的index
int beginReader;
while(true){
beginReader = buffer.readerIndex();
buffer.markReaderIndex();
if(buffer.readInt() == -32523523){
break;
}
//未讀到包頭,略過一個位元組
buffer.resetReaderIndex();
buffer.readByte();
//長度又變得不滿足
if(buffer.readableBytes() < BASE_LENTH){
return null;
}
}
//模組號
short module = buffer.readShort();
//命令號
short cmd = buffer.readShort();
//長度
int length = buffer.readInt();
//判斷請求資料包資料是否到齊
if(buffer.readableBytes() < length){
//還原讀指標
buffer.readerIndex(beginReader);
return null;
}
//讀取data資料
byte[] data = new byte[length];
buffer.readBytes(data);
Request request = new Request();
request.setModule(module);
request.setCmd(cmd);
request.setData(data);
//繼續往下傳遞
return request;
}
//資料包不完整,需要等待後面的包來
return null;
}
}
本人才疏學淺,如有錯誤,請指出
謝謝!