Netty(二) springboot 整合netty編寫時間伺服器
阿新 • • 發佈:2018-12-16
這個例子與上個例子( springboot 整合netty做心跳檢測)最大的不同就是,服務端傳送包含32位整數的訊息,而不接收任何請求,並在傳送訊息後關閉連線。
因為我們將忽略任何接收到的資料,一旦建立連線就傳送訊息,這次我們不能使用channelRead()方法。 相反,我們應該重寫channelActive()方法。
專案依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.21.Final</ version>
</dependency>
服務端
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
/**
* 這次我們不能使用channelRead()方法。相反,我們應該重寫channelActive()方法
*/
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
/**
* 要傳送新訊息,我們需要分配一個包含訊息的新緩衝區。我們要寫一個32位整數,
* 因此我們需要一個容量至少為4個位元組的ByteBuf。
* 通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator並分配一個新的緩衝區。
*/
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture cf = ctx.writeAndFlush(time);
/**
* 我們怎麼知道寫請求是否完成?這就像向返回的ChannelFuture新增ChannelFutureListener一樣。
* 在這裡,我們建立了一個新的匿名ChannelFutureListener,它在操作完成時關閉Channel
*/
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert cf == future;
ctx.close();
}
});*/
//為了簡化程式碼也可以這麼寫
cf.addListener(ChannelFutureListener.CLOSE);
}
/**
* 當由於I / O錯誤或由於處理事件時丟擲異常導致的處理程式實現而由Netty引發異常時,使用Throwable呼叫exceptionCaught()事件處理程式方法。
* 在大多數情況下,應記錄捕獲的異常並在此處關閉其關聯的通道,儘管此方法的實現可能會有所不同,具體取決於您要處理特殊情況的操作。
* 例如,您可能希望在關閉連線之前傳送帶有錯誤程式碼的響應訊息。
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Component
public class DiscardServer {
@Value("${netty.server.port}")
private int port;
@PostConstruct
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 繫結並開始接受傳入連線。
ChannelFuture f = b.bind(port).sync();
// 等到伺服器套接字關閉。
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
客戶端程式碼:
@Component
public class TimeClient {
@Value("${netty.server.port}")
private int port;
@Value("${netty.server.host}")
private String host;
@PostConstruct
public void timeClient() throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup).channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//啟動客戶端
ChannelFuture f = bootstrap.connect(host, port).sync();
//等到連線關閉
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
下面程式碼接受服務端的訊息並列印
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//Netty將從對等方傳送的資料讀入ByteBuf
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println("收到服務端傳送的訊息"+new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
列印結果: