Netty之解決TCP粘包拆包(自定義協議)
1、什麼是粘包/拆包
一般所謂的TCP粘包是在一次接收資料不能完全地體現一個完整的訊息資料。TCP通訊為何存在粘包呢?主要原因是TCP是以流的方式來處理資料,再加上網路上MTU的往往小於在應用處理的訊息資料,所以就會引發一次接收的資料無法滿足訊息的需要,導致粘包的存在。處理粘包的唯一方法就是制定應用層的資料通訊協議,通過協議來規範現有接收的資料是否滿足訊息資料的需要。
2、解決辦法
2.1、訊息定長,報文大小固定長度,不夠空格補全,傳送和接收方遵循相同的約定,這樣即使粘包了通過接收方程式設計實現獲取定長報文也能區分。
2.2、包尾新增特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字元作為報文分隔符,接收方通過特殊分隔符切分報文區分。
2.3、將訊息分為訊息頭和訊息體,訊息頭中包含表示資訊的總長度(或者訊息體長度)的欄位
3、自定義協議,來實現TCP的粘包/拆包問題
3.0 自定義協議,開始標記
3.1 自定義協議的介紹
3.2 自定義協議的類的封裝
3.3 自定義協議的編碼器
3.4 自定義協議的解碼器
4、協議相關的實現
4.1 協議的封裝
import java.util.Arrays; /** * <pre> * 自己定義的協議 * 資料包格式 * +——----——+——-----——+——----——+ * |協議開始標誌| 長度 | 資料 | * +——----——+——-----——+——----——+ * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76 * 2.傳輸資料的長度contentLength,int型別 * 3.要傳輸的資料 * </pre> */ public class SmartCarProtocol { /** * 訊息的開頭的資訊標誌 */ private int head_data = ConstantValue.HEAD_DATA; /** * 訊息的長度 */ private int contentLength; /** * 訊息的內容 */ private byte[] content; /** * 用於初始化,SmartCarProtocol * * @param contentLength * 協議裡面,訊息資料的長度 * @param content * 協議裡面,訊息的資料 */ public SmartCarProtocol(int contentLength, byte[] content) { this.contentLength = contentLength; this.content = content; } public int getHead_data() { return head_data; } public int getContentLength() { return contentLength; } public void setContentLength(int contentLength) { this.contentLength = contentLength; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } @Override public String toString() { return "SmartCarProtocol [head_data=" + head_data + ", contentLength=" + contentLength + ", content=" + Arrays.toString(content) + "]"; } }
4.2 協議的編碼器
/** * <pre> * 自己定義的協議 * 資料包格式 * +——----——+——-----——+——----——+ * |協議開始標誌| 長度 | 資料 | * +——----——+——-----——+——----——+ * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76 * 2.傳輸資料的長度contentLength,int型別 * 3.要傳輸的資料 * </pre> */ public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> { @Override protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg, ByteBuf out) throws Exception { // 寫入訊息SmartCar的具體內容 // 1.寫入訊息的開頭的資訊標誌(int型別) out.writeInt(msg.getHead_data()); // 2.寫入訊息的長度(int 型別) out.writeInt(msg.getContentLength()); // 3.寫入訊息的內容(byte[]型別) out.writeBytes(msg.getContent()); } }
4.3 協議的解碼器
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* <pre>
* 自己定義的協議
* 資料包格式
* +——----——+——-----——+——----——+
* |協議開始標誌| 長度 | 資料 |
* +——----——+——-----——+——----——+
* 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
* 2.傳輸資料的長度contentLength,int型別
* 3.要傳輸的資料,長度不應該超過2048,防止socket流的攻擊
* </pre>
*/
public class SmartCarDecoder extends ByteToMessageDecoder {
/**
* <pre>
* 協議開始的標準head_data,int型別,佔據4個位元組.
* 表示資料的長度contentLength,int型別,佔據4個位元組.
* </pre>
*/
public final int BASE_LENGTH = 4 + 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
List<Object> out) throws Exception {
// 可讀長度必須大於基本長度
if (buffer.readableBytes() >= BASE_LENGTH) {
// 防止socket位元組流攻擊
// 防止,客戶端傳來的資料過大
// 因為,太大的資料,是不合理的
if (buffer.readableBytes() > 2048) {
buffer.skipBytes(buffer.readableBytes());
}
// 記錄包頭開始的index
int beginReader;
while (true) {
// 獲取包頭開始的index
beginReader = buffer.readerIndex();
// 標記包頭開始的index
buffer.markReaderIndex();
// 讀到了協議的開始標誌,結束while迴圈
if (buffer.readInt() == ConstantValue.HEAD_DATA) {
break;
}
// 未讀到包頭,略過一個位元組
// 每次略過,一個位元組,去讀取,包頭資訊的開始標記
buffer.resetReaderIndex();
buffer.readByte();
// 當略過,一個位元組之後,
// 資料包的長度,又變得不滿足
// 此時,應該結束。等待後面的資料到達
if (buffer.readableBytes() < BASE_LENGTH) {
return;
}
}
// 訊息的長度
int length = buffer.readInt();
// 判斷請求資料包資料是否到齊
if (buffer.readableBytes() < length) {
// 還原讀指標
buffer.readerIndex(beginReader);
return;
}
// 讀取data資料
byte[] data = new byte[length];
buffer.readBytes(data);
SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);
out.add(protocol);
}
}
}
4.4 服務端加入協議的編/解碼器
4.5 客戶端加入協議的編/解碼器
5、服務端的實現
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public Server() {
}
public void bind(int port) throws Exception {
// 配置NIO執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 伺服器輔助啟動類配置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler())//
.option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區 // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 繫結埠 同步等待繫結成功
ChannelFuture f = b.bind(port).sync(); // (7)
// 等到服務端監聽埠關閉
f.channel().closeFuture().sync();
} finally {
// 優雅釋放執行緒資源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* 網路事件處理器
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 新增自定義協議的編解碼工具
ch.pipeline().addLast(new SmartCarEncoder());
ch.pipeline().addLast(new SmartCarDecoder());
// 處理網路IO
ch.pipeline().addLast(new ServerHandler());
}
}
public static void main(String[] args) throws Exception {
new Server().bind(9999);
}
}
6、服務端Handler的實現
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServerHandler extends ChannelHandlerAdapter {
// 用於獲取客戶端傳送的資訊
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 用於獲取客戶端發來的資料資訊
SmartCarProtocol body = (SmartCarProtocol) msg;
System.out.println("Server接受的客戶端的資訊 :" + body.toString());
// 會寫資料給客戶端
String str = "Hi I am Server ...";
SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,
str.getBytes());
// 當服務端完成寫操作後,關閉與客戶端的連線
ctx.writeAndFlush(response);
// .addListener(ChannelFutureListener.CLOSE);
// 當有寫操作時,不需要手動釋放msg的引用
// 當只有讀操作時,才需要手動釋放msg的引用
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// cause.printStackTrace();
ctx.close();
}
}
7、客戶端的實現
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
/**
* 連線伺服器
*
* @param port
* @param host
* @throws Exception
*/
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客戶端輔助啟動類 對客戶端配置
Bootstrap b = new Bootstrap();
b.group(group)//
.channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)//
.handler(new MyChannelHandler());//
// 非同步連結伺服器 同步等待連結成功
ChannelFuture f = b.connect(host, port).sync();
// 等待連結關閉
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
System.out.println("客戶端優雅的釋放了執行緒資源...");
}
}
/**
* 網路事件處理器
*/
private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 新增自定義協議的編解碼工具
ch.pipeline().addLast(new SmartCarEncoder());
ch.pipeline().addLast(new SmartCarDecoder());
// 處理網路IO
ch.pipeline().addLast(new ClientHandler());
}
}
public static void main(String[] args) throws Exception {
new Client().connect(9999, "127.0.0.1");
}
}
8、客戶端Handler的實現
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
//用於讀取客戶端發來的資訊
public class ClientHandler extends ChannelHandlerAdapter {
// 客戶端與服務端,連線成功的售後
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 傳送SmartCar協議的訊息
// 要傳送的資訊
String data = "I am client ...";
// 獲得要傳送資訊的位元組陣列
byte[] content = data.getBytes();
// 要傳送資訊的長度
int contentLength = content.length;
SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);
ctx.writeAndFlush(protocol);
}
// 只是讀資料,沒有寫資料的話
// 需要自己手動的釋放的訊息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
// 用於獲取客戶端發來的資料資訊
SmartCarProtocol body = (SmartCarProtocol) msg;
System.out.println("Client接受的客戶端的資訊 :" + body.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
9、參考的部落格地址
http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html
http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html