1. 程式人生 > >netty原始碼分析(二十二)Netty編解碼器剖析與入站出站處理器詳解

netty原始碼分析(二十二)Netty編解碼器剖析與入站出站處理器詳解

Netty處理器重要概念:
1、Netty的處理器可以分為兩類:入棧處理器和出棧處理器。
2、入棧處理器的頂層是ChannelInboundHandler,出棧處理器的頂層是ChannelOutboundHandler。
3、資料處理時常用的各種編解碼器本質上都是處理器。
4、編解碼器:無論我們是向網路中寫入資料是什麼型別(int、char、String、二進位制等),資料在網路中傳遞時,其都是以位元組流的形式出現的;將資料由原本的形式轉換為位元組流的操作稱為編碼(encode),將資料由位元組轉換為它原本的格式或是其他格式的操作稱為解碼(decode),編碼統一稱為codec。
5、編碼:本質上是一種出棧處理器;因此,編碼一定是一種ChannelOutboundHandler。
6、解碼:本質上是一種入棧處理器,因此。解碼一定是一種ChannelInboundHandler。
7、在Netty中,編碼器通常以XXXEncoder命名;解碼器通常以XXXDecoder命名。

netty下邊有很多編解碼器的實現:
這裡寫圖片描述

實際開發的過程中我們可以去使用它們,我們要講的不是去使用它們,現在以一個例子來說明編解碼的一些內幕:
netty提供了一個位元組到訊息的轉換器(ByteToMessageDecoder):
這裡寫圖片描述
接下來我們使用ByteToMessageDecoder自己實現一個解碼器:


public class MyByteToLongDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws
Exception { System.out.println("decode invoked"); System.out.println(in.readableBytes()); if(in.readableBytes()>=8){ out.add(in.readLong()); } } }

然後需要一個將Long型別的資料轉換為byte的書裝換器:
使用MessageToByteEncoder:
服務端:

/**
 * Created by Administrator on 2017/5/20.
 * 伺服器和客戶端互發程式
 */
public class MyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

服務端初始化:

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipline = ch.pipeline();
        pipline.addLast(new MyByteToLongDecoder());
        pipline.addLast(new MyLongToByteEncoder());
        pipline.addLast(new MyServerHandler());
    }
}

服務端handler:

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+" --> "+msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客戶端:

public class Myclient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientIniatializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().writeAndFlush("hello");
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客戶端的initializer:

public class MyClientIniatializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipline = ch.pipeline();

        pipline.addLast(new MyByteToLongDecoder());
        pipline.addLast(new MyLongToByteEncoder());
        pipline.addLast(new MyClientHandler());
    }
}

客戶端handler:

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress());
        System.out.println("client output "+msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(123456L);//一定要加L,否則會作為int型別處理,最終導致訊息傳送不出去。
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

位元組到Long型別的解碼器(解析網路傳過來的資料):

public class MyByteToLongDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("decode invoked");
        System.out.println(in.readableBytes());
        if(in.readableBytes()>=8){
                out.add(in.readLong());
        }
    }
}

Long型別轉換為位元組(傳送到網路之前的轉換):

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        System.out.println("encode invoked");
        System.out.println(msg);
        out.writeLong(msg);
    }
}

執行服務端,之後執行客戶端,列印輸出:
server端輸出:

decode invoked
8
/127.0.0.1:4679 --> 123456

client輸出:

encode invoked
123456

至於為什麼是這樣一個過程:
首先客戶端啟動之後,呼叫MyLongToByteEncoder的encode方法列印“encode invoked”和傳送的資料“123456”。
服務段接受到之後呼叫MyByteToLongDecoder的decode列印“decode invoked”和資料長度“8”,之後是呼叫MyClientHandler的channelRead0列印“/127.0.0.1:4679 –> 123456”

接下來我們修改MyClientHandler如下:

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress());
        System.out.println("client output "+msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // ctx.writeAndFlush(123456L);
    /*
        都可以傳送
        ctx.writeAndFlush(1L);
        ctx.writeAndFlush(2L);
        ctx.writeAndFlush(3L);
        ctx.writeAndFlush(4L);*/

        ctx.writeAndFlush(Unpooled.copiedBuffer("helloworld", Charset.forName("utf-8")));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

ctx.writeAndFlush(Unpooled.copiedBuffer(“helloworld”, Charset.forName(“utf-8”)));
這行程式碼,即傳送一個Buffer,我們檢視控制檯列印的資料:
server端的列印:

decode invoked
10
/127.0.0.1:6394 --> 7522537965574647666
decode invoked
2

客戶端沒有任何輸出,因為我們的客戶端使用的是ByteBuff,而客戶端的編碼器是Long型別的,所以編碼器沒有執行,直接把資料丟給了Socket傳輸到了網路,所以服務端會收到資料,我們傳送的資料是“helloworld”由於是utf-8,所以一個英文字元是一個位元組,一共是10個位元組,我們解碼器只有在大於8個位元組的時候才會對其進行解碼然後給到下一個處理器,所以10個位元組前8個通過了解碼器,去了下一個handler,而剩下的2個沒有通過解碼器,服務端列印的“/127.0.0.1:6394 –> 7522537965574647666”後邊的那串數字是8個位元組的資料。

關於netty編解碼器的重要結論:
1、無論是編碼器還是解碼器,其接受的訊息型別必須要與待處理的引數型別一致,否則該編碼器或解碼器並不會執行。
2、在解碼器進行資料解碼時,一定要記得判斷緩衝(ByteBuf)中的資料是否 足夠,否則將會產生一些問題。
例如上邊的例子判斷是否是8個長度(因為long是佔用8個位元組的資料型別):

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("decode invoked");
        System.out.println(in.readableBytes());
        if(in.readableBytes()>=8){
                out.add(in.readLong());
        }
    }