Netty 整合 MessagePack 序列化框架 + LengthFieldBasedFrameDecoder 自定義解碼器
環境準備及說明
如果是匯入二進位制開發包,則如下所示:
需要開發包的可以參考《 MessagePack 開發入門詳解》。
如果是 Maven 專案,則新增如下依賴:
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.30.Final</version> </dependency> <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency> <!-- https://mvnrepository.com/artifact/org.javassist/javassist --> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.24.0-GA</version> </dependency>
1)netty-all:是 Netty 開發包
2)msgpack:是 Messagepck 序列化開發包
3)javassist:是 msgpack 自己的依賴包
本文示例專案結構如下:
1)User:網路傳輸的 POJO 物件,注意:序列化 POJO 必須加 org.msgpack.annotation.Message 註解:@Message
2)echo:包中為 netty 通訊的客戶端與服務端
3)messagepack:包中為 MessagePack 編解碼器
特別提醒:
1)雖然 MessagePack 用於序列化物件,但是普通 String、Integer 等等同樣也是物件,所以照樣可以傳輸普通的字串等訊息
2)需要序列化的 POJO 物件上必須加上 org.msgpack.annotation.Message 註解:@Message,否則傳輸會失敗,而且也不報錯,很難排查
3)MessagePack 序列化物件後的訊息,經過傳送後,接收端 channelRead(ChannelHandlerContext ctx, Object msg)
3.1)即使傳送的是 User 物件,接收端的 msg 也不能進行 User user = (User)msg 強轉,否則客戶端會被強制斷開連線
3.2)如果傳送的是 User 物件,接收端可以轉為 List<Object> objects = (List<Object>) msg,list 中的元素對應 User 的屬性值
3.3)如果傳送的不是 POJO 物件,而是簡單的 String 物件,則不能轉為 List<Object>,否則客戶端也會被強制斷開
MessagePack 編解碼器
利用 Netty 的編解碼框架可以非常方便的整合第三方序列化框架,Netty 預集成了幾種常用的編解碼框架,使用者也可以根據自己專案的實際情況整合其它編解碼框架,或者進行自定義。
MessagePack 編碼器
package com.example.messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
/**
* Created by Administrator on 2018/11/25 0025.
* MessagePack 編碼器 —— 繼承 Netty 的 MessageToByteEncoder,比重寫方法
*/
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
/**
* 重寫方法,負責將 Object 型別的 POJO 物件編碼為 byte 陣列,然後寫入 ByteBuf 中
*
* @param channelHandlerContext
* @param o
* @param byteBuf
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
MessagePack messagePack = new MessagePack();
/** 序列化物件*/
byte[] raw = messagePack.write(o);
byteBuf.writeBytes(raw);
}
}
MessagePack 解碼器
package com.example.messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;
/**
* Created by Administrator on 2018/11/25 0025.
* MessagePack 解碼器 - 繼承 Netty 的 MessageToMessageDecoder,並重寫方法
*/
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
/**
* 重寫方法,首先從資料報 byteBuf 中獲取需要解碼的 byte 陣列,
* 然後呼叫 MessagePack 的 read 方法將其反序列化為 Object 物件,將解碼後的物件加入到解碼列表 list 中,
* 這樣就完成了 MessagePack 的解碼操作
*
* @param channelHandlerContext
* @param byteBuf
* @param list
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int length = byteBuf.readableBytes();
byte[] array = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);
MessagePack messagePack = new MessagePack();
list.add(messagePack.read(array));
}
}
POJO User
package com.example.domain;
import org.msgpack.annotation.Message;
import java.util.Date;
/**
* Created by Administrator on 2018/11/25 0025.
* 使用者 實體
* 需要序列化的 POJO 物件上必須加上 org.msgpack.annotation.Message 註解:@Message
*/
@Message
public class User {
private Integer pId;
private String pName;
private Date birthday;
private Boolean isMarry;
public Date getBirthday() {
return birthday;
}
public void setBirthday(Date birthday) {
this.birthday = birthday;
}
public Integer getpId() {
return pId;
}
public void setpId(Integer pId) {
this.pId = pId;
}
public String getpName() {
return pName;
}
public void setpName(String pName) {
this.pName = pName;
}
public Boolean getIsMarry() {
return isMarry;
}
public void setIsMarry(Boolean isMarry) {
this.isMarry = isMarry;
}
@Override
public String toString() {
return "User{" +
"birthday=" + birthday +
", pId=" + pId +
", pName='" + pName + '\'' +
", isMarry=" + isMarry +
'}';
}
}
Netty 網路通訊
首先模擬的情況是:客戶端連線上伺服器後,給伺服器連發訊息,伺服器接收後會將原資訊進回覆,同時會解決 TCP 粘包與拆包。會使用 Netty 的 LengthFieldPrepender、LengthFieldBasedFrameDecoder 編解碼器處理半包訊息,不會出現 TCP 粘包/拆包。
服務端
EchoServer 內容如下:
package com.example.echo;
import com.example.messagepack.MsgpackDecoder;
import com.example.messagepack.MsgpackEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* Created by Administrator on 2018/11/11 0011.
* Echo 伺服器
*/
public class EchoServer {
public static void main(String[] args) {
int port = 9898;
new EchoServer().bind(port);
}
public void bind(int port) {
/**
* interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
* 配置服務端的 NIO 執行緒池,用於網路事件處理,實質上他們就是 Reactor 執行緒組
* bossGroup 用於服務端接受客戶端連線,workerGroup 用於進行 SocketChannel 網路讀寫*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/** ServerBootstrap 是 Netty 用於啟動 NIO 服務端的輔助啟動類,用於降低開發難度
* */
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// 設定TCP連線超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println(Thread.currentThread().getName() + ",伺服器初始化通道...");
/**
* 為了處理半包訊息,新增如下兩個 Netty 內建的編解碼器
* LengthFieldPrepender:前置長度域編碼器——放在MsgpackEncoder編碼器前面
* LengthFieldBasedFrameDecoder:長度域解碼器——放在MsgpackDecoder解碼器前面
* 關於 長度域編解碼器處理半包訊息,本文不做詳細講解,會有專門篇章進行說明
*/
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder());
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
/**伺服器啟動輔助類配置完成後,呼叫 bind 方法繫結監聽埠,呼叫 sync 方法同步等待繫結操作完成*/
ChannelFuture f = b.bind(port).sync();
System.out.println(Thread.currentThread().getName() + ",伺服器開始監聽埠,等待客戶端連線.........");
/**下面會進行阻塞,等待伺服器連線關閉之後 main 方法退出,程式結束* */
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/**優雅退出,釋放執行緒池資源*/
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler 內容如下:
package com.example.echo;
import com.example.domain.User;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Administrator on 2017/5/16.
* ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用於對網路事件進行讀寫操作
*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 因為多執行緒,所以使用原子操作類來進行計數
*/
private static AtomicInteger atomicInteger = new AtomicInteger();
/**
* 收到客戶端訊息,自動觸發
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println((atomicInteger.addAndGet(1)) + "--->" + Thread.currentThread().getName() + ",The server receive order : " + msg);
/**
* 如果傳輸的是 POJO 物件,則可以轉成 List<Object>
* list 中的每一個元素都是傳送來的 POJO 物件的屬性值
* 注意:如果對方傳輸只是簡單的 String 物件,則不能強轉為 List<Object>
*/
/* List<Object> objects = (List<Object>) msg;
for (Object obj : objects) {
System.out.println("屬性:" + obj);
}*/
/**
* 服務端接收到客戶端傳送來的資料後,再回發給客戶端
*/
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("-----客戶端關閉:" + ctx.channel().remoteAddress());
/**當發生異常時,關閉 ChannelHandlerContext,釋放和它相關聯的控制代碼等資源 */
ctx.close();
}
}
客戶端
EchoClient 內容如下:
package com.example.echo;
import com.example.messagepack.MsgpackDecoder;
import com.example.messagepack.MsgpackEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
/**
* Created by Administrator on 2017/5/16.
* Echo 客戶端
*/
public class EchoClient {
/**
* 使用 2 個執行緒模擬 2 個客戶端
*
* @param args
*/
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new MyThread()).start();
}
}
static class MyThread implements Runnable {
@Override
public void run() {
connect("192.168.1.20", 9898);
}
public void connect(String host, int port) {
/**配置客戶端 NIO 執行緒組/池*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/**Bootstrap 與 ServerBootstrap 都繼承(extends)於 AbstractBootstrap
* 建立客戶端輔助啟動類,並對其配置,與伺服器稍微不同,這裡的 Channel 設定為 NioSocketChannel
* 然後為其新增 Handler,這裡直接使用匿名內部類,實現 initChannel 方法
* 作用是當建立 NioSocketChannel 成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路I/O事件*/
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// 設定TCP連線超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
System.out.println(Thread.currentThread().getName() + ",客戶端初始化管道...");
/**
* 為了處理半包訊息,新增如下兩個 Netty 內建的編解碼器
* LengthFieldPrepender:前置長度域編碼器——放在MsgpackEncoder編碼器前面
* LengthFieldBasedFrameDecoder:長度域解碼器——放在MsgpackDecoder解碼器前面
* 關於 長度域編解碼器處理半包訊息,本文不做詳細講解,會有專門篇章進行說明
*/
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder());
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
/**connect:發起非同步連線操作,呼叫同步方法 sync 等待連線成功*/
ChannelFuture channelFuture = b.connect(host, port).sync();
System.out.println(Thread.currentThread().getName() + ",客戶端發起非同步連線..........");
/**等待客戶端鏈路關閉*/
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/**優雅退出,釋放NIO執行緒組*/
group.shutdownGracefully();
}
}
}
}
EchoClientHandler 內容如下:
package com.example.echo;
import com.example.domain.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Administrator on 2017/5/17.
* 用於對網路事件進行讀寫操作
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
/**
* 因為 Netty 採用執行緒池,所以這裡使用原子操作類來進行計數
*/
private static AtomicInteger atomicInteger = new AtomicInteger();
/**
* 當客戶端和服務端 TCP 鏈路建立成功之後,Netty 的 NIO 執行緒會呼叫 channelActive 方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
/**
* 多餘 陣列、List、Set、Map 等,對立面的元素逐個進行傳送,則對方也是逐個接收
* 否則如果直接傳送 陣列、List、Set、Map 等,則對方會統一接收
* 注意:因為使用LengthFieldPrepender、LengthFieldBasedFrameDecoder編解碼器處理半包訊息
* 所以這裡連續傳送也不會出現 TCP 粘包/拆包
*/
List<User> users = getUserArrayData();
for (User user : users) {
ctx.writeAndFlush(user);
}
ctx.writeAndFlush("我是普通的字串訊息" + Thread.currentThread().getName());
}
/**
* 當服務端返回應答訊息時,channelRead 方法被呼叫,從 Netty 的 ByteBuf 中讀取並列印應答訊息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + msg);
}
/**
* 當發生異常時,列印異常 日誌,釋放客戶端資源
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
/**釋放資源*/
ctx.close();
}
/**
* 設定網路傳輸的 POJO 物件陣列/列表
*
* @return
*/
public List<User> getUserArrayData() {
User[] users = new User[5];
User loopUser = null;
for (int i = 0; i < 5; i++) {
loopUser = new User();
loopUser.setpId(i + 1);
loopUser.setpName("華安" + Thread.currentThread().getName());
loopUser.setIsMarry(true);
loopUser.setBirthday(new Date());
users[i] = loopUser;
}
return Arrays.asList(users);
}
}
執行測試
先執行伺服器,再執行客戶端。