1. 程式人生 > >粘包與分包問題的出現及解決

粘包與分包問題的出現及解決

1、粘包出現的原因:服務端與客戶端沒有約定好要使用的資料結構。Socket Client實際是將資料包傳送到一個快取buffer中,通過buffer刷到資料鏈路層。因服務端接收資料包時,不能斷定資料包1何時結束,就有可能出現數據包2的部分資料結合資料包1傳送出去,導致伺服器讀取資料包1時包含了資料包2的資料。這種現象稱為粘包。

2、案例展示:(1)、服務端程式碼如下,具體註釋說明

package com.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Netty5服務端
 * @author zhengzx
 *
 */
public class ServerSocket {
	public static void main(String[] args) {
		
		//建立服務類
		ServerBootstrap serverBootstrap = new ServerBootstrap();
		
		//boss和worker
		NioEventLoopGroup boss = new NioEventLoopGroup();
		NioEventLoopGroup worker = new NioEventLoopGroup();
		
		try {
			//設定執行緒池
			serverBootstrap.group(boss,worker);
			//設定socket工廠,Channel 是對 Java 底層 Socket 連線的抽象
			serverBootstrap.channel(NioServerSocketChannel.class);
			//設定管道工廠
			serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

				@Override
				protected void initChannel(Channel ch) throws Exception {
					//設定後臺轉換器(二進位制轉換字串)
					ch.pipeline().addLast(new StringDecoder());
					ch.pipeline().addLast(new StringEncoder());
					ch.pipeline().addLast(new ServerSocketHandler());
				}
			});
			
			//設定TCP引數
			serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048);//連線緩衝池大小
			serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//維持連線的活躍,清除死連線
			serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);//關閉超時連線
			
			ChannelFuture future = serverBootstrap.bind(10010);//繫結埠
			System.out.println("服務端啟動");
			
			//等待服務端關閉
			future.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			//釋放資源
			boss.shutdownGracefully();
			worker.shutdownGracefully();
		}
		
	}
}

(2)、ServerSocketHandler處理類展示:

package com.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ServerSocketHandler extends SimpleChannelInboundHandler<String>{

	@Override
	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println(msg);
	}
	
}

(3)、客戶端傳送請求程式碼展示:

package com.client;

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client {
	public static void main(String[] args) throws UnknownHostException, IOException {
		//建立連線
		Socket socket = new Socket("127.0.0.1", 10010);
		//迴圈傳送請求
		for(int i=0;i<1000;i++){
			socket.getOutputStream().write("hello".getBytes());
		}	
		//關閉連線
		socket.close();
	}
}

(4)、列印結果。(正常情況應為一行一個hello列印)
        

3、分包:資料包資料被分開一部分發送出去,服務端一次讀取資料時可能讀取到完整資料包的一部分,剩餘部分被第二次讀取。具體情況如下圖展示:
 

4、解決辦法1:定義一個穩定的結構。
     (1)、包頭+length+資料包:客戶端程式碼展示:包頭——>用來防止socket攻擊,length——>用來獲取資料包的長度。

package com.server;

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;

import org.omg.CORBA.PRIVATE_MEMBER;
import org.omg.CORBA.PUBLIC_MEMBER;

/**
 * @category 通過長度+資料包的方式解決粘包分包問題
 * @author zhengzx
 *
 */
public class Client {
	//定義包頭
	public static int BAO = 24323455;
	public static void main(String[] args) throws UnknownHostException, IOException {
		//建立連線
		Socket socket = new Socket("127.0.0.1", 10010);
		//客戶端傳送的訊息
		String msg = "hello";
		//獲取訊息的位元組碼
		byte[] bytes = msg.getBytes();
		//初始化buffer的長度:4+4表示包頭長度+存放資料長度的整數的長度
		ByteBuffer buffer = ByteBuffer.allocate(8+bytes.length);
		//將長度和資料存入buffer中
		buffer.putInt(BAO);
		buffer.putInt(bytes.length);
		buffer.put(bytes);
		//獲取緩衝區中的資料
		byte[] array = buffer.array();
		//迴圈傳送請求
		for(int i=0;i<1000;i++){
			socket.getOutputStream().write(array);
		}	
		//關閉連線
		socket.close();
	}
}

(2)、服務端:需要注意的是,添加了MyDecoder類,此類具體下面介紹

package com.server;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

public class Server {

	public static void main(String[] args) {
		//服務類
		ServerBootstrap bootstrap = new ServerBootstrap();
		
		//boss執行緒監聽埠,worker執行緒負責資料讀寫
		ExecutorService boss = Executors.newCachedThreadPool();
		ExecutorService worker = Executors.newCachedThreadPool();
		
		//設定niosocket工廠
		bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
		
		//設定管道的工廠
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline = Channels.pipeline();
				pipeline.addLast("decoder", new MyDecoder());
				pipeline.addLast("handler1", new HelloHandler());
				return pipeline;
			}
		});
		
		bootstrap.bind(new InetSocketAddress(10101));
		
		System.out.println("start!!!");
	}

}

(3)、MyDecode類,需要繼承FrameDecoder類。此類中用ChannelBuffer快取沒有讀取的資料包,等接收到第二次傳送的資料包時,會將此資料包與快取的資料包進行拼接處理。
           當return一個String時,FarmedDecoder通過判斷返回型別,呼叫相應的sendUpStream(event)向下傳遞資料。原始碼展示:

    public static void fireMessageReceived(
            ChannelHandlerContext ctx, Object message, SocketAddress remoteAddress) {
        ctx.sendUpstream(new UpstreamMessageEvent(
                ctx.getChannel(), message, remoteAddress));
    }

           當返回null時,會進行break,不處理資料包中的資料,原始碼展示:

while (cumulation.readable()) {
            int oldReaderIndex = cumulation.readerIndex();
            Object frame = decode(context, channel, cumulation);
            if (frame == null) {
                if (oldReaderIndex == cumulation.readerIndex()) {
                    // Seems like more data is required.
                    // Let us wait for the next notification.
                    break;
                } else {
                    // Previous data has been discarded.
                    // Probably it is reading on.
                    continue;
                }
            }

 我們自己寫的MyDecoder類,程式碼展示:(包含socket攻擊的校驗)

package com.server;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class MyDecoder extends FrameDecoder{


	@Override
	protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
		//buffer.readableBytes獲取緩衝區中的資料 需要 大於基本長度
		if(buffer.readableBytes() > 4) {
			//防止socket攻擊,當緩衝區資料大於2048時,清除資料。
			if(buffer.readableBytes() > 2048) {
				buffer.skipBytes(buffer.readableBytes());
			}
			//迴圈獲取包頭,確定資料包的開始位置
			while(true) {
				buffer.markReaderIndex();
				if(buffer.readInt() == Client.BAO) {
					break;
				}
				//只讀取一個位元組
				buffer.resetReaderIndex();
				buffer.readByte();
				
				if(buffer.readableBytes() < 4) {
					return null;
				}
			}
			//做標記
			buffer.markReaderIndex();
			//獲取資料包的傳送過來時的長度
			int readInt = buffer.readInt();
			//判斷buffer中剩餘的資料包長度是否大於單個數據包的長度(readInt)
			if(buffer.readableBytes() < readInt) {
				//返回到上次做標記的地方,因為此次資料讀取的不是一個完整的資料包。
				buffer.resetReaderIndex();
				//快取當前資料,等待剩下資料包到來
				return null;
			}
			//定義一個數據包的長度
			byte[] bt = new byte[readInt];
			//讀取資料
			buffer.readBytes(bt);
			//往下傳遞物件
			return new String(bt);
		}
		//快取當前資料包,等待第二次資料的到來
		return null;
	}

}

(4)、服務端,處理請求的handler。

package com.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class HelloHandler extends SimpleChannelHandler {
	
	private int count = 1;

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		
		System.out.println(e.getMessage() + "  " +count);
		count++;
	}
}

(5)、結果展示(按順序列印):

            

5、解決辦法2:在訊息的尾部加一些特殊字元,那麼在讀取資料的時候,只要讀到這個特殊字元,就認為已經可以擷取一個完整的資料包了,這種情況在一定的業務情況下實用。