1. 程式人生 > >Netty之傳輸POJO(使用Java自帶的序列化方式)

Netty之傳輸POJO(使用Java自帶的序列化方式)

1、使用Netty傳輸POJO物件,重點在於物件的序列化。序列化的物件通過TCP進行網路傳輸,結合Netty提供的物件編解碼器,可以做到遠端傳輸物件。首先Java需要序列化的物件,需要實現java.io.Serializable介面.

2、工程目錄

       2.1 專案的目錄結構

      

        2.2  關於Request和Response的講解

               Request是對於,客戶端向服務端的請求資訊的封裝;Response是對於,服務端向客戶端響應資訊的封裝。

3、使用Netty提供的物件編解碼器

        3.1 服務端編解碼器的配置

             


        3.2 客戶端編解碼器的配置

             

        3.3 加入物件編解碼後,可以直接傳送物件

          

4、服務端程式碼

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

	public Server() {
	}

	public void bind(int port) throws Exception {
		// 配置NIO執行緒組
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			// 伺服器輔助啟動類配置
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.handler(new LoggingHandler(LogLevel.INFO))
					.childHandler(new ChildChannelHandler())//
					.option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區 // (5)
					.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
			// 繫結埠 同步等待繫結成功
			ChannelFuture f = b.bind(port).sync(); // (7)
			// 等到服務端監聽埠關閉
			f.channel().closeFuture().sync();
		} finally {
			// 優雅釋放執行緒資源
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

	/**
	 * 網路事件處理器
	 */
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 新增物件解碼器 負責對序列化POJO物件進行解碼 設定物件序列化最大長度為1M 防止記憶體溢位
			// 設定執行緒安全的WeakReferenceMap對類載入器進行快取 支援多執行緒併發訪問 防止記憶體溢位
			ch.pipeline().addLast(
					new ObjectDecoder(1024 * 1024, ClassResolvers
							.weakCachingConcurrentResolver(this.getClass()
									.getClassLoader())));
			// 新增物件編碼器 在伺服器對外發送訊息的時候自動將實現序列化的POJO物件編碼
			ch.pipeline().addLast(new ObjectEncoder());
			// 處理網路IO
			ch.pipeline().addLast(new ServerHandler());
		}
	}

	public static void main(String[] args) throws Exception {
		new Server().bind(9999);
	}
}
5、服務端處理器程式碼
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {
	// 用於獲取客戶端傳送的資訊
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		// 用於獲取客戶端發來的資料資訊
		Request body = (Request) msg;
		System.out.println("Server接受的客戶端的資訊 :" + body.toString());

		// 會寫資料給客戶端
		Response response = new Response(Integer.parseInt(body.getUrl()),
				"xiaoming");
		// 當服務端完成寫操作後,關閉與客戶端的連線
		ctx.writeAndFlush(response);
		// .addListener(ChannelFutureListener.CLOSE);

		// 當有寫操作時,不需要手動釋放msg的引用
		// 當只有讀操作時,才需要手動釋放msg的引用
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		// cause.printStackTrace();
		ctx.close();
	}
}
6、客戶端程式碼
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class Client {

	/**
	 * 連線伺服器
	 * 
	 * @param port
	 * @param host
	 * @throws Exception
	 */
	public void connect(int port, String host) throws Exception {
		// 配置客戶端NIO執行緒組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			// 客戶端輔助啟動類 對客戶端配置
			Bootstrap b = new Bootstrap();
			b.group(group)//
					.channel(NioSocketChannel.class)//
					.option(ChannelOption.TCP_NODELAY, true)//
					.handler(new MyChannelHandler());//
			// 非同步連結伺服器 同步等待連結成功
			ChannelFuture f = b.connect(host, port).sync();

			// 等待連結關閉
			f.channel().closeFuture().sync();

		} finally {
			group.shutdownGracefully();
			System.out.println("客戶端優雅的釋放了執行緒資源...");
		}

	}

	/**
	 * 網路事件處理器
	 */
	private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 新增自定義的編碼器和解碼器
			// 新增POJO物件解碼器 禁止快取類載入器
			ch.pipeline().addLast(
					new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this
							.getClass().getClassLoader())));
			// 設定傳送訊息編碼器
			ch.pipeline().addLast(new ObjectEncoder());
			// 處理網路IO
			ch.pipeline().addLast(new ClientHandler());// 處理網路IO
		}

	}

	public static void main(String[] args) throws Exception {
		new Client().connect(9999, "127.0.0.1");

	}

}
7、客戶端處理器程式碼
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

//用於讀取客戶端發來的資訊
public class ClientHandler extends ChannelHandlerAdapter {

	// 客戶端與服務端,連線成功的售後
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// 傳送訊息
		Request request1 = new Request("666");
		Request request2 = new Request("777");
		Request request3 = new Request("888");
		ctx.writeAndFlush(request1);
		ctx.writeAndFlush(request2);
		Thread.sleep(2000);
		ctx.writeAndFlush(request3);
	}

	// 只是讀資料,沒有寫資料的話
	// 需要自己手動的釋放的訊息
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		try {
			Response response = (Response) msg;
			System.out.println(response);

		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}

}
8、Request請求資訊(要實現Serializable介面)
import java.io.Serializable;

public class Request implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = -7033707301911915196L;
	private String url;

	public Request() {
	}

	public Request(String url) {
		this.url = url;
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	@Override
	public String toString() {
		return "Request [url=" + url + "]";
	}

}
9、Response請求資訊(要實現Serializable介面)
import java.io.Serializable;

public class Response implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = -6236340795725143988L;
	private int age;
	private String name;

	public Response() {
	}

	public Response(int age, String name) {
		this.age = age;
		this.name = name;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public String toString() {
		return "Response [age=" + age + ", name=" + name + "]";
	}

}