netty權威指南學習筆記六——編解碼技術之MessagePack
編解碼技術主要應用在網絡傳輸中,將對象比如BOJO進行編解碼以利於網絡中進行傳輸。平常我們也會將編解碼說成是序列化/反序列化
定義:當進行遠程跨進程服務調用時,需要把被傳輸的java對象編碼為字節數組或者ByteBuffer對象。而當遠程服務讀取到ByteBuffer對象或者字節數組時,需要將其解碼為發送時的java對象。這被稱為java對象編解碼技術。比如java的序列化。
但是,java的序列化有一定的弊端;
- java序列化是java私有的協議,其他語言不支持,故而不能實現跨語言;
- 其次,序列化後的碼流太大;
- 再次,序列化性能太低,耗時長。
因此,通常不會選擇java序列化作為遠程跨節點調用的編解碼框架。
當前業界主流的編解碼框架有:1)MessagePack高效的二進制序列化框架;2)Google 的Protobuf;3)Facebook的Thrift;4)JBoss Marshalliing
下面運行MessagePack的編解碼
這個示例在權威指南上,作者並沒有給出完整代碼,本博主剛開始運行也沒有運行出來,經過網絡搜索,參考相關文章運行了出來,其中潛在存在著一些坑,運行中本博主也發現一些現象也總結出來。
一、首先我們需要引入相關jar包
1 <dependency> 2 <groupId>org.msgpack</groupId> 3 <artifactId>msgpack</artifactId> 4 <version>0.6.11</version> 5 6 </dependency> 7 <!-- 創建項目時已經存在,這裏貼出來,但本博主不重復放包 8 <dependency> 9 <groupId>org.javassist</groupId>10 <artifactId>javassist</artifactId> 11 <version>3.22.0-GA</version> 12 </dependency> 13 -->
二、放上編解碼代碼,
坑一、一定要在繼承的類後面加上泛型,這樣,方法中的參數才能跟著發生變化
如過沒有添加<>,則解碼實現接口方法時直接生成的方法的參數如下:
編碼代碼
1 package com.messagePack; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.handler.codec.MessageToByteEncoder; 6 import org.msgpack.MessagePack; 7 8 public class MsgPackEncoder extends MessageToByteEncoder<Object> { 9 @Override 10 protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { 11 MessagePack msgPack = new MessagePack(); 12 // 編碼,然後轉為ButyBuf傳遞 13 byte[] bytes = msgPack.write(o); 14 byteBuf.writeBytes(bytes); 15 } 16 }
解碼代碼
1 package com.messagePack; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.handler.codec.MessageToMessageDecoder; 6 import org.msgpack.MessagePack; 7 8 import java.util.List; 9 10 public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> { 11 @Override 12 protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { 13 // 獲取要解碼的byte數組 14 final byte[] bytes; 15 final int length = byteBuf.readableBytes(); 16 bytes = new byte[length]; 17 byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length); 18 // 調用MessagePack 的read方法將其反序列化為Object對象 19 MessagePack msgPack = new MessagePack(); 20 list.add(msgPack.read(bytes)); 21 } 22 }
如果實現的接口後面沒有添加泛型<ByteBuf>,則解碼實現接口方法時直接生成的方法的參數如下:
1 @Override 2 protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception { 3 4 }
不過經過實驗,將接收到的Object進行轉換後仍然可以,要記好netty接收和傳遞信息都是經過ByteBuf進行的
1 @Override 2 protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception { 3 ByteBuf byteBuf = (ByteBuf) o; 4 // 獲取要解碼的byte數組 5 final byte[] bytes; 6 final int length = byteBuf.readableBytes(); 7 bytes = new byte[length]; 8 byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length); 9 // 調用MessagePack 的read方法將其反序列化為Object對象 10 MessagePack msgPack = new MessagePack(); 11 list.add(msgPack.read(bytes)); 12 }
三、服務端代碼
1 package com.messagePack; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioServerSocketChannel; 10 import io.netty.handler.logging.LogLevel; 11 import io.netty.handler.logging.LoggingHandler; 12 13 public class EchoServer { 14 public void bind(int port) throws InterruptedException { 15 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); 16 NioEventLoopGroup workGroup = new NioEventLoopGroup(); 17 try { 18 ServerBootstrap b = new ServerBootstrap(); 19 b.group(bossGroup,workGroup) 20 .channel(NioServerSocketChannel.class) 21 .option(ChannelOption.SO_BACKLOG,1024) 22 .childHandler(new LoggingHandler(LogLevel.INFO)) 23 .childHandler(new ChannelInitializer<SocketChannel>() { 24 @Override 25 protected void initChannel(SocketChannel socketChannel) throws Exception { 26 socketChannel.pipeline() 27 .addLast("decoder",new MsgPackDecoder()) 28 .addLast("encoder",new MsgPackEncoder()) 29 .addLast(new EchoServerHandler()); 30 } 31 }); 32 // 綁定端口,同步等待成功 33 ChannelFuture f = b.bind(port).sync(); 34 // 等待服務端監聽端口關閉 35 f.channel().closeFuture().sync(); 36 } finally { 37 bossGroup.shutdownGracefully(); 38 workGroup.shutdownGracefully(); 39 } 40 } 41 public static void main(String[] args) throws InterruptedException { 42 int port = 8080; 43 if(args.length>0&&args!=null){ 44 port = Integer.parseInt(args[0]); 45 } 46 new EchoServer().bind(port); 47 48 } 49 }
IO處理
1 package com.messagePack; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelInboundHandlerAdapter; 7 8 import java.util.List; 9 10 public class EchoServerHandler extends ChannelInboundHandlerAdapter { 11 int count; 12 @Override 13 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 14 System.out.println("server receive the msgpack message : "+msg+""); 15 // 原路返回給客戶端 16 ctx.writeAndFlush(msg); 17 /* 在EchoClientHandler中向服務端發送一個pojo對象,經過MessagePack編解碼後, 18 在EchoServerHandler中的channelRead方法中打印的msg為pojo對象的toString方法內容, 19 不可以直接將msg轉換為User,如果采用如下代碼運行不成功*/ 20 /* List<User> users = (List<User>) msg; 21 System.out.println("到這裏面來了,users是否為空:"); 22 System.out.println(users!=null); 23 for(User u : users){ 24 System.out.println("This is"+ ++count +" times server receive client request."+u); 25 ctx.write(u); 26 } 27 28 ctx.flush();*/ 29 } 30 31 @Override 32 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 33 ctx.flush(); 34 } 35 36 @Override 37 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 38 ctx.close(); 39 } 40 }
坑二、這裏的坑比較大,在上述代碼註釋中已經說明了,下面運行結果是註釋掉以上14~16行代碼,放開20~28行代碼時候運行結果,結果表明收到的消息轉化為User數組時候,是空的,但是後臺並沒有報錯,不知道為什麽在我的IDEA上運行不下去但是不報錯
四、客戶端代碼
1 package com.messagePack; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelFuture; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelOption; 9 import io.netty.channel.nio.NioEventLoopGroup; 10 import io.netty.channel.socket.SocketChannel; 11 import io.netty.channel.socket.nio.NioSocketChannel; 12 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 13 import io.netty.handler.codec.string.StringDecoder; 14 15 public class EchoClient { 16 public void connection(int port,String host) throws InterruptedException { 17 NioEventLoopGroup workGroup = new NioEventLoopGroup(); 18 try { 19 Bootstrap b = new Bootstrap(); 20 b.group(workGroup) 21 .channel(NioSocketChannel.class) 22 .option(ChannelOption.TCP_NODELAY,true) 23 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000) 24 .handler(new ChannelInitializer<SocketChannel>() { 25 @Override 26 protected void initChannel(SocketChannel socketChannel) throws Exception { 27 socketChannel.pipeline().addLast("msgpack decoder",new MsgPackDecoder()) 28 .addLast("msgpack encoder",new MsgPackEncoder()) 29 .addLast(new EchoClientHandler()); 30 // 31 } 32 }); 33 // 發起異步連接操作 34 ChannelFuture f = b.connect(host,port).sync(); 35 // 等待客戶端鏈路關閉 36 f.channel().closeFuture().sync(); 37 } finally { 38 workGroup.shutdownGracefully(); 39 } 40 } 41 public static void main(String[] args) throws InterruptedException { 42 int port = 8080; 43 if(args.length>0&&args!=null){ 44 System.out.println(args[0]); 45 port = Integer.parseInt(args[0]); 46 } 47 new EchoClient().connection(port,"127.0.0.1"); 48 } 49 }
IO處理類
1 package com.messagePack; 2 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 6 public class EchoClientHandler extends ChannelInboundHandlerAdapter { 7 private int count; 8 9 @Override 10 public void channelActive(ChannelHandlerContext ctx) throws Exception { 11 /* User user = getUser(); 12 ctx.writeAndFlush(user);*/ 13 User[] users = getUsers(); 14 for(User u : users){ 15 ctx.write(u); 16 } 17 ctx.flush(); 18 } 19 20 @Override 21 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 22 System.out.println("this is client receive msg【 "+ ++count +" 】times:【"+msg+"】"); 23 if(count<5){ //控制運行次數,因為不加這個控制直接調用下面代碼的話,客戶端和服務端會形成閉環循環,一直運行 24 ctx.write(msg); 25 } 26 } 27 28 @Override 29 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 30 ctx.flush(); 31 } 32 33 @Override 34 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 35 ctx.close(); 36 } 37 private User[] getUsers(){ 38 User[] users = new User[5]; 39 for(int i=0;i<5;i++){ 40 User user = new User(); 41 user.setId(String.valueOf(i)); 42 user.setAge(18+i); 43 user.setName("張元"+i); 44 user.setSex("男"+String.valueOf(i*2)); 45 users[i]=user; 46 } 47 return users; 48 } 49 50 private User getUser(){ 51 User user = new User(); 52 user.setId("11"); 53 user.setAge(18); 54 user.setName("張元"); 55 user.setSex("男"); 56 return user; 57 } 58 }
五、User代碼
坑三、要傳輸的javabean一定要加上註解@message
1 package com.messagePack; 2 3 import org.msgpack.annotation.Message; 4 5 @Message 6 public class User { 7 private String name; 8 private int age; 9 private String id; 10 private String sex; 11 12 public int getAge() { 13 return age; 14 } 15 16 public void setAge(int age) { 17 this.age = age; 18 } 19 20 public String getId() { 21 return id; 22 } 23 24 public void setId(String id) { 25 this.id = id; 26 } 27 28 public String getSex() { 29 return sex; 30 } 31 32 public void setSex(String sex) { 33 this.sex = sex; 34 } 35 36 public String getName() { 37 return name; 38 } 39 40 public void setName(String name) { 41 this.name = name; 42 } 43 44 @Override 45 public String toString() { 46 return "User{" + 47 "name=‘" + name + ‘\‘‘ + 48 ", age=" + age + 49 ", id=‘" + id + ‘\‘‘ + 50 ", sex=‘" + sex + ‘\‘‘ + 51 ‘}‘; 52 } 53 }
六、運行結果
客戶端
服務端
七、上面運行的結果發現打印的數據都完全一樣,這是因為沒有考慮粘包/半包的處理,還不能正常工作,下面我們利用Netty的LengthFieldPrepender和LengthFieldBasedFrameDecoder,來解決上述問題,這裏只需要對客戶端和服務端添加相關的處理類就可以了,改動代碼如下:
客戶端
1 .handler(new ChannelInitializer<SocketChannel>() { 2 @Override 3 protected void initChannel(SocketChannel socketChannel) throws Exception { 4 socketChannel.pipeline() 5 .addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 6 0,4,0,4)) 7 .addLast("msgpack decoder",new MsgPackDecoder()) 8 .addLast("frameEncoder",new LengthFieldPrepender(4)) 9 .addLast("msgpack encoder",new MsgPackEncoder()) 10 .addLast(new EchoClientHandler()); 11 // 12 } 13 });
服務端
1 .childHandler(new ChannelInitializer<SocketChannel>() { 2 @Override 3 protected void initChannel(SocketChannel socketChannel) throws Exception { 4 socketChannel.pipeline() 5 .addLast("framDecoder",new LengthFieldBasedFrameDecoder(65535, 6 0,4,0,4)) 7 .addLast("decoder",new MsgPackDecoder()) 8 .addLast("frameEncoder",new LengthFieldPrepender(4)) 9 .addLast("encoder",new MsgPackEncoder()) 10 .addLast(new EchoServerHandler()); 11 } 12 });
其實兩者改動的地方完全一樣,下面看一下運行效果
客戶端
服務端
這次運行結果顯示的是0~5的數據,但是遺憾的是不知道為什麽有重復了一下,而且之重復了部分的運行,暫時不理會。以後應用多了可能就明白了。或者有路過的朋友知道的留個言,謝謝!
netty權威指南學習筆記六——編解碼技術之MessagePack