1. 程式人生 > >NIO入門案例使用netty最新版本框架程式碼實現及詳細註釋

NIO入門案例使用netty最新版本框架程式碼實現及詳細註釋

使用的Nettyjar包:netty-all-4.1.27.Final.jar

專案結構:

服務端程式碼

package com.nio.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyNioServer {

    public void bind(int port)throws Exception{
        // 配置服務端的NIO執行緒組,NioEventLoopGroup是個執行緒組,包含了一組NIO執行緒
        // 專門用於網路時間的處理,實際上它們就是Reactor執行緒組
        NioEventLoopGroup groupParent = new NioEventLoopGroup();
        NioEventLoopGroup groupChild = new NioEventLoopGroup();

        try {
            // 建立ServerBootstrap物件,它是Netty用於啟動Nio服務的輔助類啟動器
            // 目的是降低服務端的開發複雜度
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 將兩個執行緒組當引數傳遞到ServerBootstrap中
            // 設定建立的channel為NioServerSocketChannel
            // 配置NioServerSocketChannel的TCP引數,此處的backlog設定為1024
            // 繫結IO事件的處理類ChildChannelHandler,用於處理網路IO事件,例如記錄日誌,對訊息進行編碼等
            bootstrap.group(groupParent,groupChild)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 呼叫bind方法繫結埠,呼叫同步阻塞方法sync等待繫結操作成功
            // 返回值主要用於非同步操作的通知回撥
            ChannelFuture future = bootstrap.bind(port).sync();
            // 等待服務端監聽埠關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 退出,釋放系統資源
            groupParent.shutdownGracefully();
            groupChild.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new NettyNioServerHandler());
        }
    }


    public static void main(String[] args)throws Exception {
        int port = 8089;
        if (args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }

        }
        new NettyNioServer().bind(port);

    }


}
package com.nio.netty.server;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class NettyNioServerHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 將msg轉換成Netty的ByteBuf物件
        ByteBuf buf = (ByteBuf) msg;
        // buf.readableBytes()獲取緩衝區可讀的位元組數
        // 根據可讀的位元組數建立新的陣列
        byte[] req = new byte[buf.readableBytes()];
        // 將緩衝區的位元組陣列複製到新建的位元組byte陣列
        buf.readBytes(req);
        // 對這個位元組陣列進編碼
        String body = new String(req, "utf-8");
        System.out.println("服務端收到客戶端發來的的訊息是: " + body);

        // 判斷客戶端發來的訊息和服務端預設值的訊息是否相同
        // 如果相同就返回給客戶端當前的時間
        String str = "NOW TIME";
        String nowTime = "NOTE RIGHT";
        if (str.trim().equals(body.trim())){
            nowTime = new Date().toString();
        }
        ByteBuf resp = Unpooled.copiedBuffer(nowTime.getBytes());
        // 非同步傳送應答訊息給客戶端
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 將訊息傳送佇列中的訊息寫入到socketChannel中傳送給對方
        /**
         * 為了防止頻繁的喚醒selector進行訊息傳送,Netty的write方法並不直接將訊息寫入socketChannel中
         * 呼叫write方法只是把待發送的訊息放到傳送緩衝區陣列,
         * 在通過呼叫flush方法,將緩衝區中的訊息全部寫到socketChannel中中
         */
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       // 當發生異常的時候,關閉ChannelHandlerContext,釋放和ChannelHandlerContext相關聯的控制代碼等資源
        ctx.close();
    }
}

客戶端程式碼

package com.nio.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyNioClient {

    public void connect(int port, String host)throws Exception{
        // 配置服務端的NIO執行緒組,NioEventLoopGroup是個執行緒組,包含了一組NIO執行緒
        // 專門用於網路時間的處理,實際上它們就是Reactor執行緒組
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            // 建立ServerBootstrap物件,它是Netty用於啟動Nio服務的輔助類啟動器
            // 目的是降低服務端的開發複雜度
            Bootstrap bootstrap = new Bootstrap();
            // 於服務端不同channel(NioSocketChannel.class)
            /**
             * handler,建立匿名內部類,實現initChannel方法,
             * 作用是當建立NioSocketChannel成功之後
             * 在進行初始化時,將它的channelHandler設定到ChannelPipeline中,
             * 用於處理網路IO事件
             */
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            // 發起非同步連線操作,呼叫同步阻塞方法等待連線成功
            ChannelFuture sync = bootstrap.connect(host, port).sync();
            // 等待客戶端鏈路關閉
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 退出,釋放資源
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception{
        int port = 8089;
        if (args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }

        }
        new NettyNioClient().connect(port,"localhost");
    }
}
package com.nio.netty.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.logging.Logger;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 日誌
     */
    private static  final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());

    private final ByteBuf firstMessage;

    public NettyClientHandler() {
        byte[] req = "NOW TIME".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 呼叫ChannelHandlerContext的writeAndFlush方法將請求訊息傳送給服務端
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        // buf.readableBytes()獲取緩衝區可讀的位元組數
        // 根據可讀的位元組數建立新的陣列
        byte[] req = new byte[buf.readableBytes()];
        // 將緩衝區的位元組陣列複製到新建的位元組byte陣列
        buf.readBytes(req);
        // 編碼
        String body = new String(req, "utf-8");
        // 列印服務端返回的訊息
        System.out.println("客戶端收到服務端返回的訊息是: " + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 釋放資源
        logger.warning("不期而遇的異常:" + cause.getMessage());
        ctx.close();
    }
}

【1】NIO模擬粘問題的程式碼具體實現以及詳細註釋URL: