開發基於protostuff編解碼技術的Netty程序:傳輸pojo對象
開發基於protostuff編解碼技術的Netty程序:傳輸pojo對象
前言
這次開發的Netty程序主要是在網絡中傳輸Java對象,傳輸的對象不僅限於字符串,也可以是自定義的其它類型對象。
前面使用protostuff都是比較單純地使用,進行簡單的一些測試,下面要完成的這個例子功能雖然不復雜,但相對使用起來會比較綜合一些。通過序列化工具類的開發、編解碼器的開發,然後將其應用到我們的Netty程序當中。
開發這個Netty程序來傳輸pojo對象是為了後面進行遠程過程調用框架的開發做一定的準備,因為在遠程調用時,返回的結果就是一個對象,返回的對象類型取決於調用的方法,所以通過Netty程序來傳輸pojo對象只是開發自定義RPC框架中的一小部分,但卻也是十分重要的一部分。
另外需要註意的是,Netty框架本身的使用很重要,而怎麽去開發序列化工具類(即如何通過protostuff來開發具有能用性的序列化工具)、如何基於序列化工具類開發Netty的編碼器與解碼器、如何在Netty中使用自定義開發的編碼器與解碼器等這些知識都是十重要,而且也是一定要掌握的。
代碼中都已經寫好了註釋,程序是可以直接跑起來的,依賴的相關包,因為使用的是maven工程,所以在後面也會給出pom.xml文件的內容。
protostuff序列化工具類開發
工具類的開發過程可以參考前面的文章基於protostuff的序列化工具類開發,下面直接給出具有緩存功能的序列化工具類的代碼:
SerializationUtil2.java
package cn.xpleaf.protostuff.netty.utils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.runtime.RuntimeSchema; /** * 具備緩存功能的序列化工具類,基於Protostuff實現(其基於Google Protobuf實現) * * @author yeyonghao * */ public class SerializationUtil2 { // 緩存schema對象的map private static Map<Class<?>, RuntimeSchema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, RuntimeSchema<?>>(); /** * 根據獲取相應類型的schema方法 * * @param clazz * @return */ @SuppressWarnings({ "unchecked", "unused" }) private <T> RuntimeSchema<T> getSchema(Class<T> clazz) { // 先嘗試從緩存schema map中獲取相應類型的schema RuntimeSchema<T> schema = (RuntimeSchema<T>) cachedSchema.get(clazz); // 如果沒有獲取到對應的schema,則創建一個該類型的schema // 同時將其添加到schema map中 if (schema == null) { schema = RuntimeSchema.createFrom(clazz); if (schema != null) { cachedSchema.put(clazz, schema); } } // 返回schema對象 return schema; } /** * 序列化方法,將對象序列化為字節數組(對象 ---> 字節數組) * * @param obj * @return */ @SuppressWarnings("unchecked") public static <T> byte[] serialize(T obj) { // 獲取泛型對象的類型 Class<T> clazz = (Class<T>) obj.getClass(); // 創建泛型對象的schema對象 RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz); // 創建LinkedBuffer對象 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); // 序列化 byte[] array = ProtostuffIOUtil.toByteArray(obj, schema, buffer); // 返回序列化對象 return array; } /** * 反序列化方法,將字節數組反序列化為對象(字節數組 ---> 對象) * * @param data * @param clazz * @return */ public static <T> T deserialize(byte[] data, Class<T> clazz) { // 創建泛型對象的schema對象 RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz); // 根據schema實例化對象 T message = schema.newMessage(); // 將字節數組中的數據反序列化到message對象 ProtostuffIOUtil.mergeFrom(data, message, schema); // 返回反序列化對象 return message; } }
編碼器與解碼器開發
EchoEncoder.java
package cn.xpleaf.protostuff.netty.utils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* PojoEncoder繼承自Netty中的MessageToByteEncoder類,
* 並重寫抽象方法encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
* 它負責將Object類型的POJO對象編碼為byte數組,然後寫入到ByteBuf中
*
* @author yeyonghao
*
*/
public class EchoEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 直接生成序列化對象
// 需要註意的是,使用protostuff序列化時,不需要知道pojo對象的具體類型也可以進行序列化時
// 在反序列化時,只要提供序列化後的字節數組和原來pojo對象的類型即可完成反序列化
byte[] array = SerializationUtil2.serialize(msg);
out.writeBytes(array);
}
}
EchoDecoder.java
package cn.xpleaf.protostuff.netty.utils;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
/**
* PojoDecoder繼承自Netty中的MessageToMessageDecoder類,
* 並重寫抽象方法decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
* 首先從數據報msg(數據類型取決於繼承MessageToMessageDecoder時填寫的泛型類型)中獲取需要解碼的byte數組
* 然後調用使用序列化工具類將其反序列化(解碼)為Object對象 將解碼後的對象加入到解碼列表out中,這樣就完成了解碼操作
*
* @author yeyonghao
*
*/
public class EchoDecoder extends MessageToMessageDecoder<ByteBuf> {
// 需要反序列對象所屬的類型
private Class<?> genericClass;
// 構造方法,傳入需要反序列化對象的類型
public EchoDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// ByteBuf的長度
int length = msg.readableBytes();
// 構建length長度的字節數組
byte[] array = new byte[length];
// 將ByteBuf數據復制到字節數組中
msg.readBytes(array);
// 反序列化對象
Object obj = SerializationUtil2.deserialize(array, this.genericClass);
// 添加到反序列化對象結果列表
out.add(obj);
}
}
Netty服務端程序開發
EchoServer.java
package cn.xpleaf.protostuff.netty.echoservice;
import cn.xpleaf.protostuff.netty.pojo.EchoRequest;
import cn.xpleaf.protostuff.netty.utils.EchoDecoder;
import cn.xpleaf.protostuff.netty.utils.EchoEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
public void bind(int port) throws Exception {
// 配置服務端NIO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加編碼器
ch.pipeline().addLast(new EchoDecoder(EchoRequest.class));
// 添加解碼器
ch.pipeline().addLast(new EchoEncoder());
// 添加業務處理handler
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 綁定端口,同步等待成功,該方法是同步阻塞的,綁定成功後返回一個ChannelFuture
ChannelFuture f = b.bind(port).sync();
// 等待服務端監聽端口關閉,阻塞,等待服務端鏈路關閉之後main函數才退出
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// TODO: handle exception
}
}
new EchoServer().bind(port);
}
}
EchoServerHandler.java
package cn.xpleaf.protostuff.netty.echoservice;
import java.util.UUID;
import cn.xpleaf.protostuff.netty.pojo.EchoRequest;
import cn.xpleaf.protostuff.netty.pojo.EchoResponse;
import cn.xpleaf.protostuff.netty.pojo.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到的對象的類型為EchoRequest
EchoRequest req = (EchoRequest) msg;
System.out.println(req.getRequestId() + " : " + req.getRequestObj());
// 創建需要傳輸的user對象
User user = new User();
user.setName("server");
user.setAge(10);
// 創建傳輸的user對象載體EchoRequest對象
EchoResponse resp = new EchoResponse();
// 設置responseId
resp.setResponseId(UUID.randomUUID().toString());
// 設置需要傳輸的對象
resp.setResponseObj(user);
// 設置需要傳輸的對象的類型
resp.setResponseObjClass(resp.getResponseObj().getClass());
// 調用writeAndFlush將數據發送到socketChannel
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
Netty客戶端程序開發
EchoClient.java
package cn.xpleaf.protostuff.netty.echoservice;
import cn.xpleaf.protostuff.netty.pojo.EchoResponse;
import cn.xpleaf.protostuff.netty.utils.EchoDecoder;
import cn.xpleaf.protostuff.netty.utils.EchoEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class EchoClient {
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// 設置TCP連接超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加解碼器
ch.pipeline().addLast(new EchoDecoder(EchoResponse.class));
// 添加編碼器
ch.pipeline().addLast(new EchoEncoder());
// 添加業務處理handler
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 發起異步連接操作(註意服務端是bind,客戶端則需要connect)
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放NIO線程組
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// 采用默認值
}
}
new EchoClient().connect(port, "localhost");
}
}
EchoClientHandler.java
package cn.xpleaf.protostuff.netty.echoservice;
import java.util.UUID;
import cn.xpleaf.protostuff.netty.pojo.EchoRequest;
import cn.xpleaf.protostuff.netty.pojo.EchoResponse;
import cn.xpleaf.protostuff.netty.pojo.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 創建需要傳輸的user對象
User user = new User();
user.setName("client");
user.setAge(10);
// 創建傳輸的user對象載體EchoRequest對象
EchoRequest req = new EchoRequest();
// 設置requestId
req.setRequestId(UUID.randomUUID().toString());
// 設置需要傳輸的對象
req.setRequestObj(user);
// 設置需要傳輸的對象的類型
req.setRequestObjClass(req.getRequestObj().getClass());
// 調用writeAndFlush將數據發送到socketChannel
ctx.writeAndFlush(req);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到的對象的類型為EchoResponse
EchoResponse resp = (EchoResponse) msg;
System.out.println(resp.getResponseId() + " : " + resp.getResponseObj());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
POJO
EchoRequest.java
package cn.xpleaf.protostuff.netty.pojo;
/**
* EchoRequest是client向server端發送數據的傳輸載體,將需要進行傳輸的pojo對象統一封裝到EchoRequest對象中,
* 這樣會為編解碼工作帶來很大的方便性和統一性,同時也可以攜帶其它信息, 對於後面對程序進行擴展會有非常大的幫助
*
* @author yeyonghao
*
*/
public class EchoRequest {
private String requestId;
private Object requestObj;
private Class<?> requestObjClass;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getRequestObj() {
return requestObj;
}
public void setRequestObj(Object requestObj) {
this.requestObj = requestObj;
}
public Class<?> getRequestObjClass() {
return requestObjClass;
}
public void setRequestObjClass(Class<?> requestObjClass) {
this.requestObjClass = requestObjClass;
}
}
EchoResponse.java
package cn.xpleaf.protostuff.netty.pojo;
/**
* EchoResponse是server向client端發送數據的傳輸載體,將需要進行傳輸的pojo對象統一封裝到EchoResponse對象中,
* 這樣會為編解碼工作帶來很大的方便性和統一性,同時也可以攜帶其它信息, 對於後面對程序進行擴展會有非常大的幫助
*
* @author yeyonghao
*
*/
public class EchoResponse {
private String responseId;
private Object responseObj;
private Class<?> responseObjClass;
public String getResponseId() {
return responseId;
}
public void setResponseId(String responseId) {
this.responseId = responseId;
}
public Object getResponseObj() {
return responseObj;
}
public void setResponseObj(Object responseObj) {
this.responseObj = responseObj;
}
public Class<?> getResponseObjClass() {
return responseObjClass;
}
public void setResponseObjClass(Class<?> responseObjClass) {
this.responseObjClass = responseObjClass;
}
}
User.java
package cn.xpleaf.protostuff.netty.pojo;
public class User {
private String name;
private int age;
public User() {
}
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User [name=" + name + ", age=" + age + "]";
}
}
測試
分別執行EchoServer.java
和EchoClient.java
,服務端和客戶端輸出如下:
服務端:
4b76d70d-7a31-4738-8daa-ca4f40483e7e : User [name=client, age=10]
客戶端:
e40b6e34-33a3-485e-bb8f-7157ee324e97 : User [name=server, age=10]
附錄:pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.xpleaf</groupId>
<artifactId>Chapter08</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.21.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.dyuproject.protostuff/protostuff-core -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.dyuproject.protostuff/protostuff-runtime -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java編譯插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
存在的問題及解決方案
上面其實傳輸的對象是EchoRequest
和EchoResponse
,雖然其中封裝了返回的對象,但由於其定義的類型為Object類型,在上面的例子當中,其實際上是一個User對象,在這個簡單的例子當中是可以通過類型轉換來進行向下轉型的,但實際使用時,封裝的Object對象不一定是User對象,但又需要做向下轉型,該如何解決這個問題呢?通過動態代理技術可以解決這個問題,後面會進一步改進這個例子使其更具有通用性。
開發基於protostuff編解碼技術的Netty程序:傳輸pojo對象