java網路程式設計之Netty流資料的傳輸處理(五)
Netty流資料的傳輸處理
Socket Buffer的缺陷
對於例如TCP/IP這種基於流的傳輸協議實現,接收到的資料會被儲存在socket的接受緩衝區內。不幸的是,這種基於流的傳輸緩衝區並不是一個包佇列,而是一個位元組佇列。這意味著,即使你以兩個資料包的形式傳送了兩條訊息,作業系統卻不會把它們看成是兩條訊息,而僅僅是一個批次的位元組序列。因此,在這種情況下我們就無法保證收到的資料恰好就是遠端節點所傳送的資料。例如,讓我們假設一個作業系統的TCP/IP堆疊收到了三個資料包:
ABC | DEF | GHI |
---|
由於這種流傳輸協議的普遍性質,在你的應用中有較高的可能會把這些資料讀取為另外一種形式:
ABCDE | FG | H | I |
---|
因此對於資料的接收方,不管是服務端還是客戶端,應當重構這些接收到的資料,讓其變成一種可讓你的應用邏輯易於理解的更有意義的資料結構。在上面所述的這個例子中,接收到的資料應當重構為下面的形式:
ABC | DEF | GHI |
---|
第一種解決方案(使用特殊字元分割)
Netty提供了一個分隔符類DelimiterBasedFrameDecoder(自定義分隔符)
下面的開發我是居於我的Netty第一個開發程式來講的,沒看過我的這篇文章可以先看看,想信你在Netty第一個開發程式會捕獲很多你想不到的知識。
服務端
public class Server {
public static void main(String[] args) throws Exception{
//1 建立2個執行緒,一個是負責接收客戶端的連線。一個是負責進行資料傳輸的
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
//2 建立伺服器輔助類
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//1 設定特殊分隔符
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
//2
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//3 設定字串形式的解碼
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
//4 繫結連線
ChannelFuture cf = b.bind(8765).sync();
//等待伺服器監聽埠關閉
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
關於EventLoopGroup、ServerBootstrap等等之類的我都在Netty的第一個程式都講得很清楚了,需要了解的可以參考我的第一篇文章。
程式碼說明:
1、 Unpooled.copiedBuffer(“$_”.getBytes()) 這個是設定特殊分隔符返回的是Netty中的ByteBuf型別這裡我設定的是 $_
2、DelimiterBasedFrameDecoder()是處理分隔符的類
3、StringDecoder() 設定字串形式的解碼
注意這裡使用了StringDecoder()解碼成字串形式,並不像在“Netty的第一個程式”那種方式去轉換成字串。
服務端業務處理
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String request = (String)msg;
System.out.println("Server :" + msg);
String response = "伺服器響應:" + msg + "$_";
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
}
這裡沒什麼可說的!看過我的Netty的第一個程式這篇文章大家都懂。
由於在服務端就使用了StringDecoder()解碼成字串形式,這裡不需要用ByteBuf去轉換成字串。
客戶端
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//1
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
//2
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//3
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("777$_".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("666$_".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("888$_".getBytes()));
//等待客戶端埠關閉
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
由於這裡客戶端也接收服務端返回的資料所以也採用了與服務端一樣的處理方式。
如果你看過我的Netty的第一個程式文章,你會發現當時我是休眠1s再進行傳送另一條的。到這目前你應該也知道我什麼這樣做了吧!
客戶端業務處理
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String response = (String)msg;
System.out.println("Client: " + response);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
這裡沒什麼可說的!看過我的Netty的第一個程式這篇文章大家都懂。
好!到這第一種解決方案就編寫結束了,先啟動服務端,再啟動客戶端
客戶端列印如下:
客戶端簽到後服務端的列印如下:
第二種解決方案(定長)
Netty提供了一個定長類FixdeLengthFraneDecoder
使用這個定長的有個弊端:如果由多個欄位比如可變長度的欄位組成時這個時候並解決不了什麼問題,建議使用第一個解決方案。
FixdeLengthFraneDecoder的使用跟DelimiterBasedFrameDecoder差不多,由於程式碼都差不多一樣這裡我不做太多的說明。
服務端
public class Server {
public static void main(String[] args) throws Exception{
//建立2個執行緒,一個是負責接收客戶端的連線。一個是負責進行資料傳輸的
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
//建立伺服器輔助類
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//1 設定定長字串接收
sc.pipeline().addLast(new FixedLengthFrameDecoder(3));
//2 設定字串形式的解碼
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
//4 繫結連線
ChannelFuture cf = b.bind(8765).sync();
//等待伺服器監聽埠關閉
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
1、FixedLengthFrameDecoder(3) 這裡設定定長字串接收具體設定多長自己定
2、StringDecoder() 設定字串形式的解碼
服務端業務處理
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String request = (String)msg;
System.out.println("Server :" + msg);
String response = request ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
}
服務端
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new FixedLengthFrameDecoder(3));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("777".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("666".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("888".getBytes()));
//等待客戶端埠關閉
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
客戶端業務處理
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String response = (String)msg;
System.out.println("Client: " + response);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
好!到這第二種解決方案就編寫結束了,先啟動服務端,再啟動客戶端
客戶端列印如下:
客戶端簽到後服務端的列印如下: