1. 程式人生 > >netty權威指南學習筆記六——編解碼技術之MessagePack

netty權威指南學習筆記六——編解碼技術之MessagePack

ssi add java exception 字節數組 ted evel thrift 發送

  編解碼技術主要應用在網絡傳輸中,將對象比如BOJO進行編解碼以利於網絡中進行傳輸。平常我們也會將編解碼說成是序列化/反序列化

  定義:當進行遠程跨進程服務調用時,需要把被傳輸的java對象編碼為字節數組或者ByteBuffer對象。而當遠程服務讀取到ByteBuffer對象或者字節數組時,需要將其解碼為發送時的java對象。這被稱為java對象編解碼技術。比如java的序列化。

  但是,java的序列化有一定的弊端;

  •   java序列化是java私有的協議,其他語言不支持,故而不能實現跨語言;
  •   其次,序列化後的碼流太大;
  •   再次,序列化性能太低,耗時長。

  因此,通常不會選擇java序列化作為遠程跨節點調用的編解碼框架。

  當前業界主流的編解碼框架有:1)MessagePack高效的二進制序列化框架;2)Google 的Protobuf;3)Facebook的Thrift;4)JBoss Marshalliing

下面運行MessagePack的編解碼

  這個示例在權威指南上,作者並沒有給出完整代碼,本博主剛開始運行也沒有運行出來,經過網絡搜索,參考相關文章運行了出來,其中潛在存在著一些坑,運行中本博主也發現一些現象也總結出來。

  一、首先我們需要引入相關jar包

 1         <dependency>
 2             <groupId>org.msgpack</
groupId> 3 <artifactId>msgpack</artifactId> 4 <version>0.6.11</version> 5 6 </dependency> 7 <!-- 創建項目時已經存在,這裏貼出來,但本博主不重復放包 8 <dependency> 9 <groupId>org.javassist</groupId>
10 <artifactId>javassist</artifactId> 11 <version>3.22.0-GA</version> 12 </dependency> 13 -->

二、放上編解碼代碼,

坑一、一定要在繼承的類後面加上泛型,這樣,方法中的參數才能跟著發生變化

如過沒有添加<>,則解碼實現接口方法時直接生成的方法的參數如下:

編碼代碼

 1 package com.messagePack;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.handler.codec.MessageToByteEncoder;
 6 import org.msgpack.MessagePack;
 7 
 8 public class MsgPackEncoder extends MessageToByteEncoder<Object> {
 9     @Override
10     protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
11         MessagePack msgPack = new MessagePack();
12 //      編碼,然後轉為ButyBuf傳遞
13         byte[] bytes = msgPack.write(o);
14         byteBuf.writeBytes(bytes);
15     }
16 }

解碼代碼

 1 package com.messagePack;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.handler.codec.MessageToMessageDecoder;
 6 import org.msgpack.MessagePack;
 7 
 8 import java.util.List;
 9 
10 public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
11     @Override
12     protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
13 //      獲取要解碼的byte數組
14         final byte[] bytes;
15         final int length = byteBuf.readableBytes();
16         bytes = new byte[length];
17         byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length);
18 //      調用MessagePack 的read方法將其反序列化為Object對象
19         MessagePack msgPack = new MessagePack();
20         list.add(msgPack.read(bytes));
21     }
22 }

如果實現的接口後面沒有添加泛型<ByteBuf>,則解碼實現接口方法時直接生成的方法的參數如下:

1  @Override
2     protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
3 
4     }

不過經過實驗,將接收到的Object進行轉換後仍然可以,要記好netty接收和傳遞信息都是經過ByteBuf進行的

 1     @Override
 2     protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
 3         ByteBuf byteBuf = (ByteBuf) o;
 4 //      獲取要解碼的byte數組
 5         final byte[] bytes;
 6         final int length = byteBuf.readableBytes();
 7         bytes = new byte[length];
 8         byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length);
 9 //      調用MessagePack 的read方法將其反序列化為Object對象
10         MessagePack msgPack = new MessagePack();
11         list.add(msgPack.read(bytes));
12     }

三、服務端代碼

 1 package com.messagePack;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioServerSocketChannel;
10 import io.netty.handler.logging.LogLevel;
11 import io.netty.handler.logging.LoggingHandler;
12 
13 public class EchoServer {
14     public void bind(int port) throws InterruptedException {
15         NioEventLoopGroup bossGroup = new NioEventLoopGroup();
16         NioEventLoopGroup workGroup = new NioEventLoopGroup();
17         try {
18             ServerBootstrap b = new ServerBootstrap();
19             b.group(bossGroup,workGroup)
20                     .channel(NioServerSocketChannel.class)
21                     .option(ChannelOption.SO_BACKLOG,1024)
22                     .childHandler(new LoggingHandler(LogLevel.INFO))
23                     .childHandler(new ChannelInitializer<SocketChannel>() {
24                         @Override
25                         protected void initChannel(SocketChannel socketChannel) throws Exception {
26                             socketChannel.pipeline()
27                                     .addLast("decoder",new MsgPackDecoder())
28                                     .addLast("encoder",new MsgPackEncoder())
29                                     .addLast(new EchoServerHandler());
30                         }
31                     });
32 //          綁定端口,同步等待成功
33             ChannelFuture f = b.bind(port).sync();
34 //          等待服務端監聽端口關閉
35             f.channel().closeFuture().sync();
36         } finally {
37             bossGroup.shutdownGracefully();
38             workGroup.shutdownGracefully();
39         }
40     }
41     public static void main(String[] args) throws InterruptedException {
42         int port = 8080;
43         if(args.length>0&&args!=null){
44             port = Integer.parseInt(args[0]);
45         }
46         new EchoServer().bind(port);
47 
48     }
49 }

IO處理

 1 package com.messagePack;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerContext;
 6 import io.netty.channel.ChannelInboundHandlerAdapter;
 7 
 8 import java.util.List;
 9 
10 public class EchoServerHandler extends ChannelInboundHandlerAdapter {
11     int count;
12     @Override
13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14         System.out.println("server receive the msgpack message : "+msg+"");
15         // 原路返回給客戶端
16         ctx.writeAndFlush(msg);
17 /*        在EchoClientHandler中向服務端發送一個pojo對象,經過MessagePack編解碼後,
18         在EchoServerHandler中的channelRead方法中打印的msg為pojo對象的toString方法內容,
19         不可以直接將msg轉換為User,如果采用如下代碼運行不成功*/
20 /*        List<User> users = (List<User>) msg;
21         System.out.println("到這裏面來了,users是否為空:");
22         System.out.println(users!=null);
23         for(User u : users){
24             System.out.println("This is"+ ++count +" times server receive client request."+u);
25             ctx.write(u);
26         }
27 
28         ctx.flush();*/
29     }
30 
31     @Override
32     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
33         ctx.flush();
34     }
35 
36     @Override
37     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
38         ctx.close();
39     }
40 }

坑二、這裏的坑比較大,在上述代碼註釋中已經說明了,下面運行結果是註釋掉以上14~16行代碼,放開20~28行代碼時候運行結果,結果表明收到的消息轉化為User數組時候,是空的,但是後臺並沒有報錯,不知道為什麽在我的IDEA上運行不下去但是不報錯

技術分享圖片

四、客戶端代碼

 1 package com.messagePack;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelFuture;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelOption;
 9 import io.netty.channel.nio.NioEventLoopGroup;
10 import io.netty.channel.socket.SocketChannel;
11 import io.netty.channel.socket.nio.NioSocketChannel;
12 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
13 import io.netty.handler.codec.string.StringDecoder;
14 
15 public class EchoClient {
16     public void connection(int port,String host) throws InterruptedException {
17         NioEventLoopGroup workGroup = new NioEventLoopGroup();
18         try {
19             Bootstrap b = new Bootstrap();
20             b.group(workGroup)
21                     .channel(NioSocketChannel.class)
22                     .option(ChannelOption.TCP_NODELAY,true)
23                     .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
24                     .handler(new ChannelInitializer<SocketChannel>() {
25                         @Override
26                         protected void initChannel(SocketChannel socketChannel) throws Exception {
27                             socketChannel.pipeline().addLast("msgpack decoder",new MsgPackDecoder())
28                                     .addLast("msgpack encoder",new MsgPackEncoder())
29                                     .addLast(new EchoClientHandler());
30 //
31                         }
32                     });
33 //            發起異步連接操作
34             ChannelFuture f = b.connect(host,port).sync();
35 //                          等待客戶端鏈路關閉
36             f.channel().closeFuture().sync();
37         } finally {
38             workGroup.shutdownGracefully();
39         }
40     }
41     public static void main(String[] args) throws InterruptedException {
42         int port = 8080;
43         if(args.length>0&&args!=null){
44             System.out.println(args[0]);
45             port = Integer.parseInt(args[0]);
46         }
47         new EchoClient().connection(port,"127.0.0.1");
48     }
49 }

IO處理類

 1 package com.messagePack;
 2 
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 
 6 public class EchoClientHandler extends ChannelInboundHandlerAdapter {
 7     private int count;
 8 
 9     @Override
10     public void channelActive(ChannelHandlerContext ctx) throws Exception {
11        /* User user = getUser();
12         ctx.writeAndFlush(user);*/
13         User[] users = getUsers();
14         for(User u : users){
15             ctx.write(u);
16         }
17         ctx.flush();
18     }
19 
20     @Override
21     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
22         System.out.println("this is client receive msg【  "+ ++count +"  】times:【"+msg+"】");
23        if(count<5){ //控制運行次數,因為不加這個控制直接調用下面代碼的話,客戶端和服務端會形成閉環循環,一直運行
24         ctx.write(msg);
25         }
26     }
27 
28     @Override
29     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
30         ctx.flush();
31     }
32 
33     @Override
34     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
35         ctx.close();
36     }
37     private User[] getUsers(){
38         User[] users = new User[5];
39         for(int i=0;i<5;i++){
40             User user = new User();
41             user.setId(String.valueOf(i));
42             user.setAge(18+i);
43             user.setName("張元"+i);
44             user.setSex("男"+String.valueOf(i*2));
45             users[i]=user;
46         }
47         return users;
48     }
49 
50     private User getUser(){
51             User user = new User();
52             user.setId("11");
53             user.setAge(18);
54             user.setName("張元");
55             user.setSex("男");
56         return user;
57     }
58 }

五、User代碼

坑三、要傳輸的javabean一定要加上註解@message

 1 package com.messagePack;
 2 
 3 import org.msgpack.annotation.Message;
 4 
 5 @Message
 6 public class User {
 7     private String name;
 8     private int age;
 9     private String id;
10     private String sex;
11 
12     public int getAge() {
13         return age;
14     }
15 
16     public void setAge(int age) {
17         this.age = age;
18     }
19 
20     public String getId() {
21         return id;
22     }
23 
24     public void setId(String id) {
25         this.id = id;
26     }
27 
28     public String getSex() {
29         return sex;
30     }
31 
32     public void setSex(String sex) {
33         this.sex = sex;
34     }
35 
36     public String getName() {
37         return name;
38     }
39 
40     public void setName(String name) {
41         this.name = name;
42     }
43 
44     @Override
45     public String toString() {
46         return "User{" +
47                 "name=‘" + name + ‘\‘‘ +
48                 ", age=" + age +
49                 ", id=‘" + id + ‘\‘‘ +
50                 ", sex=‘" + sex + ‘\‘‘ +
51                 ‘}‘;
52     }
53 }

六、運行結果

客戶端

技術分享圖片

服務端

技術分享圖片

七、上面運行的結果發現打印的數據都完全一樣,這是因為沒有考慮粘包/半包的處理,還不能正常工作,下面我們利用Netty的LengthFieldPrepender和LengthFieldBasedFrameDecoder,來解決上述問題,這裏只需要對客戶端和服務端添加相關的處理類就可以了,改動代碼如下:

客戶端

 1 .handler(new ChannelInitializer<SocketChannel>() {
 2                         @Override
 3                         protected void initChannel(SocketChannel socketChannel) throws Exception {
 4                             socketChannel.pipeline()
 5                                     .addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,
 6                                             0,4,0,4))
 7                                     .addLast("msgpack decoder",new MsgPackDecoder())
 8                                     .addLast("frameEncoder",new LengthFieldPrepender(4))
 9                                     .addLast("msgpack encoder",new MsgPackEncoder())
10                                     .addLast(new EchoClientHandler());
11 //
12                         }
13                     });

服務端

 1  .childHandler(new ChannelInitializer<SocketChannel>() {
 2                         @Override
 3                         protected void initChannel(SocketChannel socketChannel) throws Exception {
 4                             socketChannel.pipeline()
 5                                     .addLast("framDecoder",new LengthFieldBasedFrameDecoder(65535,
 6                                             0,4,0,4))
 7                                     .addLast("decoder",new MsgPackDecoder())
 8                                     .addLast("frameEncoder",new LengthFieldPrepender(4))
 9                                     .addLast("encoder",new MsgPackEncoder())
10                                     .addLast(new EchoServerHandler());
11                         }
12                     });

其實兩者改動的地方完全一樣,下面看一下運行效果

客戶端

技術分享圖片

服務端

技術分享圖片

這次運行結果顯示的是0~5的數據,但是遺憾的是不知道為什麽有重復了一下,而且之重復了部分的運行,暫時不理會。以後應用多了可能就明白了。或者有路過的朋友知道的留個言,謝謝!

  

netty權威指南學習筆記六——編解碼技術之MessagePack