Netty之傳輸POJO(使用Java自帶的序列化方式)
阿新 • • 發佈:2019-01-25
1、使用Netty傳輸POJO物件,重點在於物件的序列化。序列化的物件通過TCP進行網路傳輸,結合Netty提供的物件編解碼器,可以做到遠端傳輸物件。首先Java需要序列化的物件,需要實現java.io.Serializable介面.
2、工程目錄
2.1 專案的目錄結構
2.2 關於Request和Response的講解
Request是對於,客戶端向服務端的請求資訊的封裝;Response是對於,服務端向客戶端響應資訊的封裝。
3、使用Netty提供的物件編解碼器
3.1 服務端編解碼器的配置
3.2 客戶端編解碼器的配置
3.3 加入物件編解碼後,可以直接傳送物件
4、服務端程式碼
5、服務端處理器程式碼import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { public Server() { } public void bind(int port) throws Exception { // 配置NIO執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 伺服器輔助啟動類配置 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChildChannelHandler())// .option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區 // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // 繫結埠 同步等待繫結成功 ChannelFuture f = b.bind(port).sync(); // (7) // 等到服務端監聽埠關閉 f.channel().closeFuture().sync(); } finally { // 優雅釋放執行緒資源 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } /** * 網路事件處理器 */ private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 新增物件解碼器 負責對序列化POJO物件進行解碼 設定物件序列化最大長度為1M 防止記憶體溢位 // 設定執行緒安全的WeakReferenceMap對類載入器進行快取 支援多執行緒併發訪問 防止記憶體溢位 ch.pipeline().addLast( new ObjectDecoder(1024 * 1024, ClassResolvers .weakCachingConcurrentResolver(this.getClass() .getClassLoader()))); // 新增物件編碼器 在伺服器對外發送訊息的時候自動將實現序列化的POJO物件編碼 ch.pipeline().addLast(new ObjectEncoder()); // 處理網路IO ch.pipeline().addLast(new ServerHandler()); } } public static void main(String[] args) throws Exception { new Server().bind(9999); } }
6、客戶端程式碼import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { // 用於獲取客戶端傳送的資訊 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 用於獲取客戶端發來的資料資訊 Request body = (Request) msg; System.out.println("Server接受的客戶端的資訊 :" + body.toString()); // 會寫資料給客戶端 Response response = new Response(Integer.parseInt(body.getUrl()), "xiaoming"); // 當服務端完成寫操作後,關閉與客戶端的連線 ctx.writeAndFlush(response); // .addListener(ChannelFutureListener.CLOSE); // 當有寫操作時,不需要手動釋放msg的引用 // 當只有讀操作時,才需要手動釋放msg的引用 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class Client {
/**
* 連線伺服器
*
* @param port
* @param host
* @throws Exception
*/
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客戶端輔助啟動類 對客戶端配置
Bootstrap b = new Bootstrap();
b.group(group)//
.channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)//
.handler(new MyChannelHandler());//
// 非同步連結伺服器 同步等待連結成功
ChannelFuture f = b.connect(host, port).sync();
// 等待連結關閉
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
System.out.println("客戶端優雅的釋放了執行緒資源...");
}
}
/**
* 網路事件處理器
*/
private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 新增自定義的編碼器和解碼器
// 新增POJO物件解碼器 禁止快取類載入器
ch.pipeline().addLast(
new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this
.getClass().getClassLoader())));
// 設定傳送訊息編碼器
ch.pipeline().addLast(new ObjectEncoder());
// 處理網路IO
ch.pipeline().addLast(new ClientHandler());// 處理網路IO
}
}
public static void main(String[] args) throws Exception {
new Client().connect(9999, "127.0.0.1");
}
}
7、客戶端處理器程式碼
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
//用於讀取客戶端發來的資訊
public class ClientHandler extends ChannelHandlerAdapter {
// 客戶端與服務端,連線成功的售後
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 傳送訊息
Request request1 = new Request("666");
Request request2 = new Request("777");
Request request3 = new Request("888");
ctx.writeAndFlush(request1);
ctx.writeAndFlush(request2);
Thread.sleep(2000);
ctx.writeAndFlush(request3);
}
// 只是讀資料,沒有寫資料的話
// 需要自己手動的釋放的訊息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
Response response = (Response) msg;
System.out.println(response);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
8、Request請求資訊(要實現Serializable介面)
import java.io.Serializable;
public class Request implements Serializable {
/**
*
*/
private static final long serialVersionUID = -7033707301911915196L;
private String url;
public Request() {
}
public Request(String url) {
this.url = url;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
@Override
public String toString() {
return "Request [url=" + url + "]";
}
}
9、Response請求資訊(要實現Serializable介面)
import java.io.Serializable;
public class Response implements Serializable {
/**
*
*/
private static final long serialVersionUID = -6236340795725143988L;
private int age;
private String name;
public Response() {
}
public Response(int age, String name) {
this.age = age;
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Response [age=" + age + ", name=" + name + "]";
}
}