1. 程式人生 > >Java網路程式設計之Netty拆包和黏包-yellowcong

Java網路程式設計之Netty拆包和黏包-yellowcong

Netty中,解決拆包和黏包中,解決方式有三種
1、在每個包尾部,定義分隔符,通過回車符號,或者其他符號來解決
2、通過定義每個包的大小,如果包不夠就空格填充
3、自定義協議的方式,將訊息分為訊息頭和訊息體,在訊息頭中表示出訊息的總長度,然後進行邏輯處理。

案例

這個案例是通過第一種方式,通過回車符號的方式來解決拆包和黏包,通過在childHandler 中新增 指定的分隔符進行拆包黏包,通過DelimiterBasedFrameDecoder (定義分隔符)和StringDecoder(將傳送的訊息定義為String型別,而不是預設的ByteBuf型別了)這兩個類完成處理,在客戶端和服務端,都要 設定相同的Handler。

服務戶端

伺服器端,重點是childHandler ,自定義了包的分隔符

package yellowcong.socket.netty2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import
io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * 建立日期:2017年10月8日 <br/> * 建立使用者:yellowcong <br/> * 功能描述: */
public class Server { public static final Integer PORT = 8080; public static void main(String[] args) throws Exception { //1.第一個執行緒組 是用於接收Client端連線的 EventLoopGroup bootGroup = new NioEventLoopGroup(); //客戶端 //2.第二個執行緒組 實際業務操作請求 EventLoopGroup workGroup = new NioEventLoopGroup(); //網路讀寫 //3.伺服器啟動類,配置伺服器 ServerBootstrap bootstrap = new ServerBootstrap(); //加入客戶端執行緒和網路讀寫 bootstrap.group(bootGroup, workGroup) //我要指定使用NioServerSocketChannel這種型別的通道 ,當我們是Http的時候,需要更換這個Channel的型別 .channel(NioServerSocketChannel.class) // 指定處理SockerChannel 的處理器 .childHandler(new ChannelInitializer<SocketChannel>(){ //一掉要注意這個 SocketChannel 是Netty封裝的,不是NIO @Override protected void initChannel(SocketChannel ch) throws Exception { //將我們的伺服器處理類傳遞進去 //設定回車就是一條訊息 ByteBuf spilt = Unpooled.copiedBuffer("\r\n".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, spilt)); //字串形式的解碼 ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); } }) //設定大小 .option(ChannelOption.SO_BACKLOG, 128) //生產環境中,最好配額制100多 //保持連線 .childOption(ChannelOption.SO_KEEPALIVE, true) ; System.out.println("伺服器啟動。。。。。。"); //繫結指定的埠 進行監聽 ChannelFuture future = bootstrap.bind(PORT).sync(); //如果不休眠 ,直接就結束了 // Thread.sleep(1000000); //關閉Channel //這個是相當於程式是睡眠模式,執行緒阻塞在這個地方 future.channel().closeFuture().sync(); //關閉執行緒組 bootGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }

服務端訊息處理

訊息處理端,複寫了channelReadComplete 方法,用來斷開連線操作,其次在channelRead中,訊息是String型別的,不能轉化為ByteBuf型別處理。訊息傳送的時候,加上了分隔符(\r\n)

package yellowcong.socket.netty2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * 建立日期:2017年10月8日 <br/>
 * 建立使用者:yellowcong <br/>
 * 功能描述:伺服器請求 
 */

//這個Handler需要繼承 ChannelHandlerAdapter,這個是Netty實現的
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //由於設定的解碼器是String 型別的,這個地方就Msg物件是String型別的
        //讀取Message
        String result = (String) msg;
        System.out.println(result);
        //回饋客戶端
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Server get Data\r\n".getBytes()));

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //設定響應後就關閉程式
        ctx.close().addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //用來處理Netty執行過程中的異常
        cause.printStackTrace(); //列印錯誤
        ctx.close(); //關閉容器
    }




}

客戶端

客戶端,也是在handler 的處理類中,增加了兩個處理器,這樣保證伺服器和客戶端的處理訊息的方式一致,傳送訊息的時候加上了分隔符(\r\n)

package yellowcong.socket.netty2;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * 建立日期:2017年10月8日 <br/>
 * 建立使用者:yellowcong <br/>
 * 功能描述:  
 */
public class Client {

    public static void main(String[] args) throws InterruptedException {

        EventLoopGroup workGroup = new NioEventLoopGroup();

        //客戶端啟動
        Bootstrap boot  = new Bootstrap();

        boot.group(workGroup)
        .channel(NioSocketChannel.class) //客戶端的Socker
        .handler(new ChannelInitializer<SocketChannel>(){ //一掉要注意這個 SocketChannel 是Netty封裝的,不是NIO

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                //將我們的伺服器處理類傳遞進去
                ByteBuf spilt = Unpooled.copiedBuffer("\r\n".getBytes());
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, spilt));
                //字串形式的解碼
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new ClientHandler());
            }
        });

        //繫結指定的埠 進行監聽
        ChannelFuture channel = boot.connect("127.0.0.1", 8080).sync();

        //傳送資料
        channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client1\r\n".getBytes()));

        channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client2\r\n".getBytes()));

        channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client3\r\n".getBytes()));

        //這個相當於沒有關閉Channel,註釋
        channel.channel().closeFuture().sync();

        //關閉執行緒組
        workGroup.shutdownGracefully();
    }
}

客戶端訊息處理類

訊息處理類,沒做任何改變,只是讀取訊息

package yellowcong.socket.netty2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
 * 建立日期:2017年10月8日 <br/>
 * 建立使用者:yellowcong <br/>
 * 功能描述:客戶端請求的處理
 */
public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //列印錯誤
        cause.printStackTrace();

        //關閉容器
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            //獲取message
            String data = (String) msg;

            //列印資料
            System.out.println("Client:\t"+data);

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            //釋放訊息
            ReferenceCountUtil.release(msg);
        }
    }
}