Netty簡單示例----客戶端與伺服器通訊
阿新 • • 發佈:2019-02-03
本例實現功能為客戶端獲取伺服器的時間並顯示,參考《Netty權威指南》一書,不過書中使用Netty5實現的,考慮到Netty已經下架5,本例子使用Netty4.1.6版本。
首先伺服器端,包括TimeServer、TimeServerHandler兩個類:
TimeServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption ;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler (new ChildChannelHandler());
//繫結埠, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
} finally {
//優雅關閉 執行緒組
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("timeServerHandler",new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 443;
new TimeServer().bind(port);
}
}
TimeServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TimeServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Server start read");
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "Query Time Order".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "Bad Order";
//非同步傳送應答訊息給客戶端: 這裡並沒有把訊息直接寫入SocketChannel,而是放入傳送緩衝陣列中
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
客戶端,包括TimeClient和TimeClientHandler兩個類:
TimeClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public void connect(int port, String host) throws Exception{
//配置客戶端NIO 執行緒組
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
//繫結埠, 非同步連線操作
ChannelFuture future = client.connect(host, port).sync();
//等待客戶端連線埠關閉
future.channel().closeFuture().sync();
} finally {
//優雅關閉 執行緒組
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 443;
TimeClient client = new TimeClient();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
TimeClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter{
private final ByteBuf firstMSG;
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMSG);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("NOW is: " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
啟動順序,先啟動服務端,再啟動客戶端,然後再伺服器和客戶端分別達到的輸出如下:
服務端:
Server start read
The time server receive order : QUERY TIME ORDER
客戶端:
NOW is: Thu Jun 08 11:08:06 CST 2017