1. 程式人生 > >Netty5使用者手冊之五:netty中流資料的傳輸處理問題

Netty5使用者手冊之五:netty中流資料的傳輸處理問題

處理流資料的傳輸

SocketBuffer的警告

在如tcp/ip的以流為基礎傳輸資料中,資料被接收後,被儲存在一個socket接收緩衝區中。不幸的是,這個以流為基礎的緩衝區buffer不是一個包packet的佇列,而是一個位元組byte佇列。這意味著,即使你傳送兩個訊息message作為2個獨立的包,操作以系統不會把他們作為兩個訊息message,而是僅僅當做一堆位元組。因此,無法保證你讀到的資料就是對方寫的資料,這就是tcp、ip中常見的拆包、粘包問題。例如,假設作業系統已經接收到了三個包,如下:        由於流傳輸的這個普通屬性,在讀取他們的時候將會存在很大的機率,這些資料會被分段成下面的幾部分:

       因此,作為一個接收方,不管它是服務端還是客戶端,都需要把接收到的資料整理成一個或多個有意義的並且能夠被應用程式容易理解的資料。以上面這個例子為例,被接收的資料應該被分段為一下幾部分:
要想通過應用程式的方式解決上面提到的,拆分粘包的問題,可以通過以下幾個部分來處理:

第一種解決方案

現在,讓我回想一下time客戶端的示例,我們會遇到相同的問題。一個32位位元組的整形資料是一個非常小的資料,並且它也不見得會經常被分段。然而,它確實也會被拆分到不同的資料段中,並且被拆分的可能性會隨著傳輸的增加而增加。         一種簡單的解決方式是建立一個內部積累的緩衝區,並且等待知道4個位元組都被接收到這個內部的緩衝區中。下面為修改後的客戶端TimeClientHandler實現類程式碼來修復這個問題:
package com.zzj.nio.netty;

import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Time客戶端處理器類
 * @author zzj
 * @date Oct 19, 2016 5:48:56 PM
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

	private ByteBuf buf;
	
	
	 @Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		buf.release();
		buf = null;
	}

	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		buf = ctx.alloc().buffer(1);
	}

	@Override
	    public void channelRead(ChannelHandlerContext ctx, Object msg) {
	        ByteBuf m = (ByteBuf) msg; // (1)
	        buf.writeBytes(m);
	        System.out.println("ssss");
	        try {
	           if (buf.readableBytes()>=4) {
	        	   long currentTimeMillis = (buf.readUnsignedInt()-2208988800l)*1000l;
	        	   System.out.println(new Date(currentTimeMillis));
	        	   ctx.close();
	           }
	        } finally {
	            m.release();
	        }
	    }

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	   cause.printStackTrace();
       ctx.close();
	}
}
       1.ChannelHandler有兩個生命週期監聽方法:handlerAdded和handlerRemoved方法,你可以完成任意初始化任務,只要它不會被阻塞很長的時間。        2.首先,所有被接收到的資料會被積累到buf物件中。        3.然後,handler處理器必須檢測buf是搜包含了足夠多的資料,這裡個例子中是4個位元組,然後去處理實際的業務邏輯。否咋,netty將會重複呼叫channelRead方法當資料被呼叫,並且實際上所有的4個位元組都會被積累。

第二種解決方案

      儘管第一個解決方案能夠解決timeclient可能發生半包粘包的問題,但是修改的那個handler看起來不夠乾淨優雅。想一下,如果存在一個複雜的具有多個欄位的協議,比如包含很多很長的欄位,ChannelHandler的實現方式將會變得維護可困難。       可能你已經知道,我們可以為ChannelPipeline新增一個ChannelHandler處理器。因此,我們可以將一個複雜的ChannelHandler拆分成多個單獨的處理器來減少系統的複雜性。例如,你可以將TimeClientHandler拆分成兩個處理器:       1.TimeDecoder用來處理半包拆包的問題。       2.timeClientHandler原始版本的實現. Netty提供了一個可擴充套件的類,幫你完成TimeDecoder的開發:
/**
 * TimeClient的編碼器
 * @author zzj
 * @date Oct 20, 2016 1:39:38 PM
 */
public class TimeDecoder extends ByteToMessageDecoder {

	/* (non-Javadoc)
	 * @see io.netty.handler.codec.ByteToMessageDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf, java.util.List)
	 */
	@Override
	protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception {
		if (arg1.readableBytes()<4) {
			return;
		}
		arg2.add(arg1.readBytes(4));
	}
}
       1.ByteToMessageDecoder是ChannelHandler的一種實現類,它能夠很容易的處理半包粘包問題。
       2.無論何時,當新資料接收到時,ByteToMessageDecoder會呼叫一個內部可維護的decode方法來處理內部積累的buffer緩衝區。        3.decode方法可以決定當沒有足夠的資料時,不新增到out物件中。當有更多的資料接收到後,ByteToMessageDecoder會再次呼叫decode方法。        4.如果decode方法新增一個物件到out列表物件中,這意味著解碼器成功的解碼了一個訊息。ByteToMessageDecoder會釋放掉累計緩衝區已經讀取的部分。需要注意的是,我們沒有必要去解碼多條message訊息,因為By特ToMessageDecoder會一直呼叫decode方法直到沒有資料新增到out列表物件中。       下面為TimeClient中TimeDecoder後的程式碼:
                      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {

//新增業務處理器
channel.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
}
});

第三種解決方案:使用POJO代替ByteBuf

       前面的幾個例子中我們都是用ByteBuf作為協議訊息的原始資料結構,這一部分我們將改善time協議的客戶端和服務端通過pojo來代替ByteBuf物件.        使用POJO在ChannelHandler種的優勢很明顯: 通過從ChannelHandler中提取出ByteBuf的程式碼,你的handler處理器變得更好的可維護、可重用。在time客戶端和服務端的例子中,我們僅僅讀取一個32位元組的整形資料,並且使用ByteBuf不是一個主要的直接的問題。然而,你會發現當你需要實現一個真實的協議,分離程式碼變得非常的必要。首先,讓我們定義一個新的型別叫做UnixTime。
package com.zzj.nio.netty.time;

import java.util.Date;

/**
 * POJO類表示time
 * @author zzj
 * @date Oct 20, 2016 2:27:45 PM
 */
public class UnixTime {

	private final long value;

	public UnixTime(){
		this(System.currentTimeMillis() / 1000L + 2208988800L);
	}
	public UnixTime(long value) {
		this.value = value;
	}
	
	public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}
       現在我們修改TimeDecoder的程式碼,可以使用UnixTime來替代ByteBuf:
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * TimeClient的編碼器
 * @author zzj
 * @date Oct 20, 2016 1:39:38 PM
 */
public class TimeDecoder extends ByteToMessageDecoder {

	/* (non-Javadoc)
	 * @see io.netty.handler.codec.ByteToMessageDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf, java.util.List)
	 */
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		if (msg.readableBytes()<4) {
			return;
		}
		out.add(new UnixTime(msg.readUnsignedInt()));
	}
}
        修改瞭解碼器後,TimeClientHandler處理器中讀取訊息時,同樣需要修改,這裡不再使用ByteBuf物件。
package com.zzj.nio.netty.time;

import java.util.Date;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Time客戶端處理器類
 * 
 * @author zzj
 * @date Oct 19, 2016 5:48:56 PM
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		UnixTime m = (UnixTime) msg; // (1)
		System.out.println(new Date(m.value()));
		ctx.close();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}
      是不是程式碼變得更簡單、優雅了。同樣的,服務端也可以這樣修改。下面為修改TimeServerHandler的程式碼:
package com.zzj.nio.netty.time;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author zzj
 * @date Oct 19, 2016 5:00:50 PM
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
	public void channelActive(final ChannelHandlerContext ctx) throws Exception {
    	ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    	f.addListener(ChannelFutureListener.CLOSE);
	}

	@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
      現在,還沒有做的就只有編碼器了,通過試下你ChannelHandler來將ByteBuf物件轉換為UnixTIme物件。不過這已經是非常簡單了,因為當你對一個訊息編碼的時候,你不需要再處理拆包和組裝的過程。
package com.zzj.nio.netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * TimeClient的編碼器
 * @author zzj
 * @date Oct 20, 2016 1:39:38 PM
 */
public class TimeEncoder extends ChannelOutboundHandlerAdapter {

	@Override
	public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
		UnixTime unixTime = (UnixTime)msg;
		ByteBuf encoded = ctx.alloc().buffer(4);
		
        encoded.writeInt((int)unixTime.value());
        ctx.write(encoded, promise); // (1)
	}
}
        第一, 通過ChannelPromise,當編碼後的資料被寫到了通道上Netty可以通過這個物件標記是成功還是失敗。         第二, 我們不需要呼叫cxt.flush()。因為處理器已經單獨分離出了一個方法void flush(ChannelHandlerContext cxt),如果像自己實現flush方法內容可以自行覆蓋這個方法。
最後的任務就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但這是不那麼重要的工作。

優雅的關閉EveltLoopGroup

 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();