NIO入門案例使用netty最新版本框架程式碼實現及詳細註釋
阿新 • • 發佈:2019-01-10
使用的Nettyjar包:netty-all-4.1.27.Final.jar
專案結構:
服務端程式碼
package com.nio.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyNioServer { public void bind(int port)throws Exception{ // 配置服務端的NIO執行緒組,NioEventLoopGroup是個執行緒組,包含了一組NIO執行緒 // 專門用於網路時間的處理,實際上它們就是Reactor執行緒組 NioEventLoopGroup groupParent = new NioEventLoopGroup(); NioEventLoopGroup groupChild = new NioEventLoopGroup(); try { // 建立ServerBootstrap物件,它是Netty用於啟動Nio服務的輔助類啟動器 // 目的是降低服務端的開發複雜度 ServerBootstrap bootstrap = new ServerBootstrap(); // 將兩個執行緒組當引數傳遞到ServerBootstrap中 // 設定建立的channel為NioServerSocketChannel // 配置NioServerSocketChannel的TCP引數,此處的backlog設定為1024 // 繫結IO事件的處理類ChildChannelHandler,用於處理網路IO事件,例如記錄日誌,對訊息進行編碼等 bootstrap.group(groupParent,groupChild) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 呼叫bind方法繫結埠,呼叫同步阻塞方法sync等待繫結操作成功 // 返回值主要用於非同步操作的通知回撥 ChannelFuture future = bootstrap.bind(port).sync(); // 等待服務端監聽埠關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 退出,釋放系統資源 groupParent.shutdownGracefully(); groupChild.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyNioServerHandler()); } } public static void main(String[] args)throws Exception { int port = 8089; if (args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new NettyNioServer().bind(port); } }
package com.nio.netty.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class NettyNioServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 將msg轉換成Netty的ByteBuf物件 ByteBuf buf = (ByteBuf) msg; // buf.readableBytes()獲取緩衝區可讀的位元組數 // 根據可讀的位元組數建立新的陣列 byte[] req = new byte[buf.readableBytes()]; // 將緩衝區的位元組陣列複製到新建的位元組byte陣列 buf.readBytes(req); // 對這個位元組陣列進編碼 String body = new String(req, "utf-8"); System.out.println("服務端收到客戶端發來的的訊息是: " + body); // 判斷客戶端發來的訊息和服務端預設值的訊息是否相同 // 如果相同就返回給客戶端當前的時間 String str = "NOW TIME"; String nowTime = "NOTE RIGHT"; if (str.trim().equals(body.trim())){ nowTime = new Date().toString(); } ByteBuf resp = Unpooled.copiedBuffer(nowTime.getBytes()); // 非同步傳送應答訊息給客戶端 ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 將訊息傳送佇列中的訊息寫入到socketChannel中傳送給對方 /** * 為了防止頻繁的喚醒selector進行訊息傳送,Netty的write方法並不直接將訊息寫入socketChannel中 * 呼叫write方法只是把待發送的訊息放到傳送緩衝區陣列, * 在通過呼叫flush方法,將緩衝區中的訊息全部寫到socketChannel中中 */ ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 當發生異常的時候,關閉ChannelHandlerContext,釋放和ChannelHandlerContext相關聯的控制代碼等資源 ctx.close(); } }
客戶端程式碼
package com.nio.netty.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyNioClient { public void connect(int port, String host)throws Exception{ // 配置服務端的NIO執行緒組,NioEventLoopGroup是個執行緒組,包含了一組NIO執行緒 // 專門用於網路時間的處理,實際上它們就是Reactor執行緒組 NioEventLoopGroup group = new NioEventLoopGroup(); try { // 建立ServerBootstrap物件,它是Netty用於啟動Nio服務的輔助類啟動器 // 目的是降低服務端的開發複雜度 Bootstrap bootstrap = new Bootstrap(); // 於服務端不同channel(NioSocketChannel.class) /** * handler,建立匿名內部類,實現initChannel方法, * 作用是當建立NioSocketChannel成功之後 * 在進行初始化時,將它的channelHandler設定到ChannelPipeline中, * 用於處理網路IO事件 */ bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); // 發起非同步連線操作,呼叫同步阻塞方法等待連線成功 ChannelFuture sync = bootstrap.connect(host, port).sync(); // 等待客戶端鏈路關閉 sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 退出,釋放資源 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ int port = 8089; if (args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new NettyNioClient().connect(port,"localhost"); } }
package com.nio.netty.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.logging.Logger;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 日誌
*/
private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
private final ByteBuf firstMessage;
public NettyClientHandler() {
byte[] req = "NOW TIME".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 呼叫ChannelHandlerContext的writeAndFlush方法將請求訊息傳送給服務端
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// buf.readableBytes()獲取緩衝區可讀的位元組數
// 根據可讀的位元組數建立新的陣列
byte[] req = new byte[buf.readableBytes()];
// 將緩衝區的位元組陣列複製到新建的位元組byte陣列
buf.readBytes(req);
// 編碼
String body = new String(req, "utf-8");
// 列印服務端返回的訊息
System.out.println("客戶端收到服務端返回的訊息是: " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 釋放資源
logger.warning("不期而遇的異常:" + cause.getMessage());
ctx.close();
}
}
【1】NIO模擬粘問題的程式碼具體實現以及詳細註釋URL: