1. 程式人生 > >Netty之實現自定義簡單的編解碼器一(MessageToByteEncoder和ByteToMessageDecoder)

Netty之實現自定義簡單的編解碼器一(MessageToByteEncoder和ByteToMessageDecoder)

1、關於自定義編碼器的簡介

      在這裡實現的編解碼器很簡單。編碼器的功能實現的是,int--->byets的編碼;解碼器實現的是,bytes--->int的解碼。

2、編碼器的實現

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {

	@Override
	protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
			throws Exception {
		System.out.println("IntegerToByteEncoder encode msg is " + msg);
		out.writeInt(msg);
	}
}
3、解碼器的實現
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class ByteToIntegerDecoder extends ByteToMessageDecoder {
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in,
			List<Object> out) throws Exception {
		// Check if there are at least 4 bytes readable
		if (in.readableBytes() >= 4) {
			int n = in.readInt();
			System.out.println("ByteToIntegerDecoder decode msg is " + n);
			// Read integer from inbound ByteBuf
			// add to the List of decodec messages
			out.add(n);
		}
	}
}
4、使用編解碼器的步驟

     4.1  加入編解碼器

         

     4.2  傳送資訊,編碼方式的改變

          因為加入了int--->bytes的編碼器。所以,再發送資料的時候,可以直接傳送Integer型別的資料。不需要在手動的,將Integer利用Unpooled工具類轉換為ByteBuf型別在傳送。

          4.2.1  使用編碼器之前的程式碼方式


          4.2.1  使用編碼器之後的程式碼方式


     4.3  接受資訊,編碼方式的改變

           因為加入了bytes--->int的解碼器。所以,可以將訊息轉換為Integer型別處理。

          4.3.1  沒加入解碼器前的程式碼方式

              

          4.3.2  加入瞭解碼器後的程式碼方式

                

5、服務端的實現

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
	public void bind(int port) throws Exception {
		// 伺服器執行緒組 用於網路事件的處理 一個用於伺服器接收客戶端的連線
		// 另一個執行緒組用於處理SocketChannel的網路讀寫
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			// NIO伺服器端的輔助啟動類 降低伺服器開發難度
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)// 類似NIO中serverSocketChannel
					.option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP引數
					.option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區
					.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設定傳送緩衝大小
					.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩衝大小
					.option(ChannelOption.SO_KEEPALIVE, true) // 保持連線
					.childHandler(new ChildChannelHandler());// 最後繫結I/O事件的處理類
																// 處理網路IO事件

			// 伺服器啟動後 繫結監聽埠 同步等待成功 主要用於非同步操作的通知回撥 回撥處理用的ChildChannelHandler
			ChannelFuture f = serverBootstrap.bind(port).sync();
			System.out.println("Server啟動");
			// 等待服務端監聽埠關閉
			f.channel().closeFuture().sync();

		} finally {
			// 優雅退出 釋放執行緒池資源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
			System.out.println("伺服器優雅的釋放了執行緒資源...");
		}

	}

	/**
	 * 網路事件處理器
	 */
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 增加自定義的編碼器和解碼器
			ch.pipeline().addLast(new IntegerToByteEncoder());
			ch.pipeline().addLast(new ByteToIntegerDecoder());
			// 服務端的處理器
			ch.pipeline().addLast(new ServerHandler());
		}
	}

	public static void main(String[] args) throws Exception {
		int port = 9998;
		new Server().bind(port);
	}
}
6、服務端Handler的實現
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		// 接受客戶端的資料
		Integer body = (Integer) msg;
		System.out.println("Client :" + body.toString());
		// 服務端,回寫資料給客戶端
		// 直接回寫整形的資料
		ctx.writeAndFlush(33);

	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}
}
7、客戶端的實現
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
	/**
	 * 連線伺服器
	 * 
	 * @param port
	 * @param host
	 * @throws Exception
	 */
	public void connect(int port, String host) throws Exception {
		// 配置客戶端NIO執行緒組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			// 客戶端輔助啟動類 對客戶端配置
			Bootstrap b = new Bootstrap();
			b.group(group)//
					.channel(NioSocketChannel.class)//
					.option(ChannelOption.TCP_NODELAY, true)//
					.handler(new ClientChannelHandler());//
			// 非同步連結伺服器 同步等待連結成功
			ChannelFuture f = b.connect(host, port).sync();

			// 傳送訊息
			Thread.sleep(1000);
			f.channel().writeAndFlush(777);
			f.channel().writeAndFlush(666);
			Thread.sleep(2000);
			f.channel().writeAndFlush(888);

			// 等待連結關閉
			f.channel().closeFuture().sync();

		} finally {
			group.shutdownGracefully();
			System.out.println("客戶端優雅的釋放了執行緒資源...");
		}

	}

	/**
	 * 網路事件處理器
	 */
	private class ClientChannelHandler extends
			ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 增加自定義的編碼器和解碼器
			ch.pipeline().addLast(new IntegerToByteEncoder());
			ch.pipeline().addLast(new ByteToIntegerDecoder());
			// 客戶端的處理器
			ch.pipeline().addLast(new ClientHandler());
		}

	}

	public static void main(String[] args) throws Exception {
		new Client().connect(9998, "127.0.0.1");

	}
}
8、客戶端Handler的實現
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		try {
			Integer body = (Integer) msg;
			System.out.println("Client :" + body.toString());

			// 只是讀資料,沒有寫資料的話
			// 需要自己手動的釋放的訊息

		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}

}
9、程式的執行結果

Server端

Server啟動
ByteToIntegerDecoder decode msg is 777
ByteToIntegerDecoder decode msg is 666
Client :777
IntegerToByteEncoder encode msg is 33
Client :666
IntegerToByteEncoder encode msg is 33
ByteToIntegerDecoder decode msg is 888
Client :888
IntegerToByteEncoder encode msg is 33
Client端
IntegerToByteEncoder encode msg is 777
IntegerToByteEncoder encode msg is 666
ByteToIntegerDecoder decode msg is 33
ByteToIntegerDecoder decode msg is 33
Client :33
Client :33
IntegerToByteEncoder encode msg is 888
ByteToIntegerDecoder decode msg is 33
Client :33

10、原始碼下載