簡介

在之前的文章中我們提到了,對於NioSocketChannel來說,它不接收最基本的string訊息,只接收ByteBuf和FileRegion。但是ByteBuf是以二進位制的形式進行處理的,對於程式設計師來說太不直觀了,處理起來也比較麻煩,有沒有可能直接處理java簡單物件呢?本文將會探討一下這個問題。

decode和encode

比如我們需要直接向channel中寫入一個字串,在之前的文章中,我們知道這是不可以的,會報下面的錯誤:

  1. DefaultChannelPromise@57f5c075(failure: java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion))

也就說ChannelPromise只接受ByteBuf和FileRegion,那麼怎麼做呢?

既然ChannelPromise只接受ByteBuf和FileRegion,那麼我們就需要把String物件轉換成ByteBuf即可。

也就是說在寫入String之前把String轉換成ByteBuf,當要讀取資料的時候,再把ByteBuf轉換成String。

我們知道ChannelPipeline中可以新增多個handler,並且控制這些handler的順序。

那麼我們的思路就出來了,在ChannelPipeline中新增一個encode,用於資料寫入的是對資料進行編碼成ByteBuf,然後再新增一個decode,用於在資料寫出的時候對資料進行解碼成對應的物件。

encode,decode是不是很熟悉?對了,這就是物件的序列化。

物件序列化

netty中物件序列化是要把傳輸的物件和ByteBuf直接互相轉換,當然我們可以自己實現這個轉換物件。但是netty已經為我們提供了方便的兩個轉換類:ObjectEncoder和ObjectDecoder。

先看ObjectEncoder,他的作用就是將物件轉換成為ByteBuf。

這個類很簡單,我們對其分析一下:

  1. public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
  2. private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
  3. @Override
  4. protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
  5. int startIdx = out.writerIndex();
  6. ByteBufOutputStream bout = new ByteBufOutputStream(out);
  7. ObjectOutputStream oout = null;
  8. try {
  9. bout.write(LENGTH_PLACEHOLDER);
  10. oout = new CompactObjectOutputStream(bout);
  11. oout.writeObject(msg);
  12. oout.flush();
  13. } finally {
  14. if (oout != null) {
  15. oout.close();
  16. } else {
  17. bout.close();
  18. }
  19. }
  20. int endIdx = out.writerIndex();
  21. out.setInt(startIdx, endIdx - startIdx - 4);
  22. }
  23. }

ObjectEncoder繼承了MessageToByteEncoder,而MessageToByteEncoder又繼承了ChannelOutboundHandlerAdapter。為什麼是OutBound呢?這是因為我們是要對寫入的物件進行轉換,所以是outbound。

首先使用ByteBufOutputStream對out ByteBuf進行封裝,在bout中,首先寫入了一個LENGTH_PLACEHOLDER欄位,用來表示stream中中Byte的長度。然後用一個CompactObjectOutputStream對bout進行封裝,最後就可以用CompactObjectOutputStream寫入物件了。

對應的,netty還有一個ObjectDecoder物件,用於將ByteBuf轉換成對應的物件,ObjectDecoder繼承自LengthFieldBasedFrameDecoder,實際上他是一個ByteToMessageDecoder,也是一個ChannelInboundHandlerAdapter,用來對資料讀取進行處理。

我們看下ObjectDecoder中最重要的decode方法:

  1. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  2. ByteBuf frame = (ByteBuf) super.decode(ctx, in);
  3. if (frame == null) {
  4. return null;
  5. }
  6. ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
  7. try {
  8. return ois.readObject();
  9. } finally {
  10. ois.close();
  11. }
  12. }

上面的程式碼可以看到,將輸入的ByteBuf轉換為ByteBufInputStream,最後轉換成為CompactObjectInputStream,就可以直接讀取物件了。

使用編碼和解碼器

有了上面兩個編碼解碼器,直接需要將其新增到client和server端的ChannelPipeline中就可以了。

對於server端,其核心程式碼如下:

  1. //定義bossGroup和workerGroup
  2. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  3. EventLoopGroup workerGroup = new NioEventLoopGroup();
  4. try {
  5. ServerBootstrap b = new ServerBootstrap();
  6. b.group(bossGroup, workerGroup)
  7. .channel(NioServerSocketChannel.class)
  8. .handler(new LoggingHandler(LogLevel.INFO))
  9. .childHandler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. public void initChannel(SocketChannel ch) throws Exception {
  12. ChannelPipeline p = ch.pipeline();
  13. p.addLast(
  14. // 新增encoder和decoder
  15. new ObjectEncoder(),
  16. new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
  17. new PojoServerHandler());
  18. }
  19. });
  20. // 繫結埠,並準備接受連線
  21. b.bind(PORT).sync().channel().closeFuture().sync();

同樣的,對於client端,我們其核心程式碼如下:

  1. EventLoopGroup group = new NioEventLoopGroup();
  2. try {
  3. Bootstrap b = new Bootstrap();
  4. b.group(group)
  5. .channel(NioSocketChannel.class)
  6. .handler(new ChannelInitializer<SocketChannel>() {
  7. @Override
  8. public void initChannel(SocketChannel ch) throws Exception {
  9. ChannelPipeline p = ch.pipeline();
  10. p.addLast(
  11. // 新增encoder和decoder
  12. new ObjectEncoder(),
  13. new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
  14. new PojoClientHandler());
  15. }
  16. });
  17. // 建立連線
  18. b.connect(HOST, PORT).sync().channel().closeFuture().sync();

可以看到上面的邏輯就是將ObjectEncoder和ObjectDecoder新增到ChannelPipeline中即可。

最後,就可以在客戶端和瀏覽器端通過呼叫:

  1. ctx.write("加油!");

直接寫入字串物件了。

總結

有了ObjectEncoder和ObjectDecoder,我們就可以不用受限於ByteBuf了,程式的靈活程度得到了大幅提升。

本文的例子可以參考:learn-netty4

本文已收錄於 http://www.flydean.com/08-netty-pojo-buf/

最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!

歡迎關注我的公眾號:「程式那些事」,懂技術,更懂你!