1. 程式人生 > >Netty簡單示例----客戶端與伺服器通訊

Netty簡單示例----客戶端與伺服器通訊

本例實現功能為客戶端獲取伺服器的時間並顯示,參考《Netty權威指南》一書,不過書中使用Netty5實現的,考慮到Netty已經下架5,本例子使用Netty4.1.6版本。
首先伺服器端,包括TimeServer、TimeServerHandler兩個類:
TimeServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption
; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler
(new ChildChannelHandler()); //繫結埠, 同步等待成功; ChannelFuture future = bootstrap.bind(port).sync(); //等待服務端監聽埠關閉 future.channel().closeFuture().sync(); } finally { //優雅關閉 執行緒組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("timeServerHandler",new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 443; new TimeServer().bind(port); } }

TimeServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TimeServerHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("Server start read");
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");

        System.out.println("The time server receive order : " + body);
        String currentTime = "Query Time Order".equalsIgnoreCase(body) ? new java.util.Date(
                System.currentTimeMillis()).toString() : "Bad Order";
        //非同步傳送應答訊息給客戶端: 這裡並沒有把訊息直接寫入SocketChannel,而是放入傳送緩衝陣列中
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

客戶端,包括TimeClient和TimeClientHandler兩個類:
TimeClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public void connect(int port, String host) throws Exception{
        //配置客戶端NIO 執行緒組
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            //繫結埠, 非同步連線操作
            ChannelFuture future = client.connect(host, port).sync();

            //等待客戶端連線埠關閉
            future.channel().closeFuture().sync();
        } finally {
            //優雅關閉 執行緒組
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        int port = 443;
        TimeClient client = new TimeClient();
        try {
            client.connect(port, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

TimeClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter{
    private final ByteBuf firstMSG;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMSG = Unpooled.buffer(req.length);
        firstMSG.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMSG);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("NOW is: " + body);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

啟動順序,先啟動服務端,再啟動客戶端,然後再伺服器和客戶端分別達到的輸出如下:
服務端:
Server start read
The time server receive order : QUERY TIME ORDER
客戶端:
NOW is: Thu Jun 08 11:08:06 CST 2017