1. 程式人生 > >Netty之解決TCP粘包拆包(自定義協議)

Netty之解決TCP粘包拆包(自定義協議)

1、什麼是粘包/拆包

       一般所謂的TCP粘包是在一次接收資料不能完全地體現一個完整的訊息資料。TCP通訊為何存在粘包呢?主要原因是TCP是以流的方式來處理資料,再加上網路上MTU的往往小於在應用處理的訊息資料,所以就會引發一次接收的資料無法滿足訊息的需要,導致粘包的存在。處理粘包的唯一方法就是制定應用層的資料通訊協議,通過協議來規範現有接收的資料是否滿足訊息資料的需要。

2、解決辦法

     2.1、訊息定長,報文大小固定長度,不夠空格補全,傳送和接收方遵循相同的約定,這樣即使粘包了通過接收方程式設計實現獲取定長報文也能區分。

     2.2、包尾新增特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字元作為報文分隔符,接收方通過特殊分隔符切分報文區分。

     2.3、將訊息分為訊息頭和訊息體,訊息頭中包含表示資訊的總長度(或者訊息體長度)的欄位

3、自定義協議,來實現TCP的粘包/拆包問題

      3.0  自定義協議,開始標記           

              

      3.1  自定義協議的介紹

             

      3.2  自定義協議的類的封裝

             

      3.3  自定義協議的編碼器

             

      3.4  自定義協議的解碼器

          

4、協議相關的實現

      4.1  協議的封裝

import java.util.Arrays;

/**
 * <pre>
 * 自己定義的協議
 *  資料包格式
 * +——----——+——-----——+——----——+
 * |協議開始標誌|  長度             |   資料       |
 * +——----——+——-----——+——----——+
 * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
 * 2.傳輸資料的長度contentLength,int型別
 * 3.要傳輸的資料
 * </pre>
 */
public class SmartCarProtocol {
	/**
	 * 訊息的開頭的資訊標誌
	 */
	private int head_data = ConstantValue.HEAD_DATA;
	/**
	 * 訊息的長度
	 */
	private int contentLength;
	/**
	 * 訊息的內容
	 */
	private byte[] content;

	/**
	 * 用於初始化,SmartCarProtocol
	 * 
	 * @param contentLength
	 *            協議裡面,訊息資料的長度
	 * @param content
	 *            協議裡面,訊息的資料
	 */
	public SmartCarProtocol(int contentLength, byte[] content) {
		this.contentLength = contentLength;
		this.content = content;
	}

	public int getHead_data() {
		return head_data;
	}

	public int getContentLength() {
		return contentLength;
	}

	public void setContentLength(int contentLength) {
		this.contentLength = contentLength;
	}

	public byte[] getContent() {
		return content;
	}

	public void setContent(byte[] content) {
		this.content = content;
	}

	@Override
	public String toString() {
		return "SmartCarProtocol [head_data=" + head_data + ", contentLength="
				+ contentLength + ", content=" + Arrays.toString(content) + "]";
	}

}

      4.2  協議的編碼器

/**
 * <pre>
 * 自己定義的協議
 *  資料包格式
 * +——----——+——-----——+——----——+
 * |協議開始標誌|  長度             |   資料       |
 * +——----——+——-----——+——----——+
 * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
 * 2.傳輸資料的長度contentLength,int型別
 * 3.要傳輸的資料
 * </pre>
 */
public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {

	@Override
	protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,
			ByteBuf out) throws Exception {
		// 寫入訊息SmartCar的具體內容
		// 1.寫入訊息的開頭的資訊標誌(int型別)
		out.writeInt(msg.getHead_data());
		// 2.寫入訊息的長度(int 型別)
		out.writeInt(msg.getContentLength());
		// 3.寫入訊息的內容(byte[]型別)
		out.writeBytes(msg.getContent());
	}
}

      4.3  協議的解碼器

import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * <pre>
 * 自己定義的協議
 *  資料包格式
 * +——----——+——-----——+——----——+
 * |協議開始標誌|  長度             |   資料       |
 * +——----——+——-----——+——----——+
 * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
 * 2.傳輸資料的長度contentLength,int型別
 * 3.要傳輸的資料,長度不應該超過2048,防止socket流的攻擊
 * </pre>
 */
public class SmartCarDecoder extends ByteToMessageDecoder {

	/**
	 * <pre>
	 * 協議開始的標準head_data,int型別,佔據4個位元組.
	 * 表示資料的長度contentLength,int型別,佔據4個位元組.
	 * </pre>
	 */
	public final int BASE_LENGTH = 4 + 4;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
			List<Object> out) throws Exception {
		// 可讀長度必須大於基本長度
		if (buffer.readableBytes() >= BASE_LENGTH) {
			// 防止socket位元組流攻擊
			// 防止,客戶端傳來的資料過大
			// 因為,太大的資料,是不合理的
			if (buffer.readableBytes() > 2048) {
				buffer.skipBytes(buffer.readableBytes());
			}

			// 記錄包頭開始的index
			int beginReader;

			while (true) {
				// 獲取包頭開始的index
				beginReader = buffer.readerIndex();
				// 標記包頭開始的index
				buffer.markReaderIndex();
				// 讀到了協議的開始標誌,結束while迴圈
				if (buffer.readInt() == ConstantValue.HEAD_DATA) {
					break;
				}

				// 未讀到包頭,略過一個位元組
				// 每次略過,一個位元組,去讀取,包頭資訊的開始標記
				buffer.resetReaderIndex();
				buffer.readByte();

				// 當略過,一個位元組之後,
				// 資料包的長度,又變得不滿足
				// 此時,應該結束。等待後面的資料到達
				if (buffer.readableBytes() < BASE_LENGTH) {
					return;
				}
			}

			// 訊息的長度

			int length = buffer.readInt();
			// 判斷請求資料包資料是否到齊
			if (buffer.readableBytes() < length) {
				// 還原讀指標
				buffer.readerIndex(beginReader);
				return;
			}

			// 讀取data資料
			byte[] data = new byte[length];
			buffer.readBytes(data);

			SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);
			out.add(protocol);
		}
	}

}

      4.4  服務端加入協議的編/解碼器

            

      4.5  客戶端加入協議的編/解碼器

          

5、服務端的實現

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

	public Server() {
	}

	public void bind(int port) throws Exception {
		// 配置NIO執行緒組
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			// 伺服器輔助啟動類配置
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.handler(new LoggingHandler(LogLevel.INFO))
					.childHandler(new ChildChannelHandler())//
					.option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區 // (5)
					.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
			// 繫結埠 同步等待繫結成功
			ChannelFuture f = b.bind(port).sync(); // (7)
			// 等到服務端監聽埠關閉
			f.channel().closeFuture().sync();
		} finally {
			// 優雅釋放執行緒資源
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

	/**
	 * 網路事件處理器
	 */
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 新增自定義協議的編解碼工具
			ch.pipeline().addLast(new SmartCarEncoder());
			ch.pipeline().addLast(new SmartCarDecoder());
			// 處理網路IO
			ch.pipeline().addLast(new ServerHandler());
		}
	}

	public static void main(String[] args) throws Exception {
		new Server().bind(9999);
	}
}
6、服務端Handler的實現
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {
	// 用於獲取客戶端傳送的資訊
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		// 用於獲取客戶端發來的資料資訊
		SmartCarProtocol body = (SmartCarProtocol) msg;
		System.out.println("Server接受的客戶端的資訊 :" + body.toString());

		// 會寫資料給客戶端
		String str = "Hi I am Server ...";
		SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,
				str.getBytes());
		// 當服務端完成寫操作後,關閉與客戶端的連線
		ctx.writeAndFlush(response);
		// .addListener(ChannelFutureListener.CLOSE);

		// 當有寫操作時,不需要手動釋放msg的引用
		// 當只有讀操作時,才需要手動釋放msg的引用
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		// cause.printStackTrace();
		ctx.close();
	}
}
7、客戶端的實現
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

	/**
	 * 連線伺服器
	 * 
	 * @param port
	 * @param host
	 * @throws Exception
	 */
	public void connect(int port, String host) throws Exception {
		// 配置客戶端NIO執行緒組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			// 客戶端輔助啟動類 對客戶端配置
			Bootstrap b = new Bootstrap();
			b.group(group)//
					.channel(NioSocketChannel.class)//
					.option(ChannelOption.TCP_NODELAY, true)//
					.handler(new MyChannelHandler());//
			// 非同步連結伺服器 同步等待連結成功
			ChannelFuture f = b.connect(host, port).sync();

			// 等待連結關閉
			f.channel().closeFuture().sync();

		} finally {
			group.shutdownGracefully();
			System.out.println("客戶端優雅的釋放了執行緒資源...");
		}

	}

	/**
	 * 網路事件處理器
	 */
	private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 新增自定義協議的編解碼工具
			ch.pipeline().addLast(new SmartCarEncoder());
			ch.pipeline().addLast(new SmartCarDecoder());
			// 處理網路IO
			ch.pipeline().addLast(new ClientHandler());
		}

	}

	public static void main(String[] args) throws Exception {
		new Client().connect(9999, "127.0.0.1");

	}

}
8、客戶端Handler的實現
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

//用於讀取客戶端發來的資訊
public class ClientHandler extends ChannelHandlerAdapter {

	// 客戶端與服務端,連線成功的售後
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// 傳送SmartCar協議的訊息
		// 要傳送的資訊
		String data = "I am client ...";
		// 獲得要傳送資訊的位元組陣列
		byte[] content = data.getBytes();
		// 要傳送資訊的長度
		int contentLength = content.length;

		SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);

		ctx.writeAndFlush(protocol);
	}

	// 只是讀資料,沒有寫資料的話
	// 需要自己手動的釋放的訊息
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		try {
			// 用於獲取客戶端發來的資料資訊
			SmartCarProtocol body = (SmartCarProtocol) msg;
			System.out.println("Client接受的客戶端的資訊 :" + body.toString());

		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}

}
9、參考的部落格地址
http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html
http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html