netty 入門(一)
netty Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
更確切的講是一個組件,沒有那麽復雜。
例子 一 Discard服務器端
我們先寫一個簡單的服務端和客戶端作為入門,接下來我們在深入介紹裏面的內容 :(基於netty4 )
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;/** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) } @Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
這是服務端的處理引擎,繼承於ChannelInboundHandlerAdapter,但也可以實現接口
ChannelInboundHandler,不過需要實現很多方法。
channelRead()
ByteBuf是一個引用計數對象,必須通過release()方法顯式釋放。 請記住,處理程序有責任釋放傳遞給處理程序的引用計數對象。 通常,channelRead()處理方法的實現方式 如下:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
exceptionCaught()
是異常處理方法,通常是日誌記錄,異常具體處理,如發送錯誤信息並關閉連接等操作。
接下來,我們編寫服務端的主程序:
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
1,NioEventLoopGroup是一個處理I / O操作的多線程事件循環。 Netty為不同類型的傳輸提供了各種EventLoopGroup實現。 在這個例子中我們正在實現一個服務器端的應用程序,因此將使用兩個NioEventLoopGroup。 第一個,通常稱為“老板”,接受傳入的連接。 第二個,通常稱為“工人”,一旦老板接受連接並將接受的連接註冊到工作人員,就處理接受的連接的流量。 使用多少個線程以及它們如何映射到創建的通道取決於EventLoopGroup實現,甚至可以通過構造函數進行配置。
2,ServerBootstrap是一個幫助類,用於設置服務器。 您可以直接使用Channel
設置服務器。 但是,請註意,這是一個繁瑣的過程,在大多數情況下您不需要這樣做。
3,在這裏,我們指定使用NioServerSocketChannel類來實例化一個新的通道來接受傳入的連接。
4,這裏指定的處理程序將始終由新接受的Channel
進行評估。 ChannelInitializer是一個特殊的處理程序,旨在幫助用戶配置新的Channel
。 很可能您想通過添加一些處理程序(如DiscardServerHandler)來配置新通道的ChannelPipeline來實現您的網絡應用程序。 隨著應用程序變得復雜,您可能會在ChannelPipeline
中添加更多的處理程序,並將這個匿名類最終提取到頂級類中。
5,您還可以設置特定於Channel實現的參數。 我們正在編寫一個TCP / IP服務器,因此我們可以設置套接字選項,如tcpNoDelay和keepAlive。 請參閱ChannelOption的apidocs和特定的ChannelConfig實現,以獲得有關支持的ChannelOptions的概述。
6,你有沒有註意到option()和childOption()? option()用於接受傳入連接的NioServerSocketChannel。 childOption()用於在這種情況下由主服務器通道(即NioServerSocketChannel)接受的通道。
7,我們現在準備好了。 剩下的是綁定到端口並啟動服務器。 這裏,我們綁定到機器中所有NIC(網絡接口卡)的端口8080。 您現在可以根據需要調用bind()方法多次(使用不同的綁定地址)。
恭喜! 你剛剛完成了Netty上的第一臺服務器。
========================================================
如何查看接收到的數據
現在我們已經寫了我們的第一個服務器,我們需要測試它是否真的有效。 測試它的最簡單的方法是使用telnet命令。 例如,您可以在命令行中輸入telnet localhost 8080,然後輸入發送內容。
但是如何證明服務器收到了信息了呢?我們已經知道,在收到數據時,會調用channelRead()方法。 讓我們把一些代碼放到DiscardServerHandler的channelRead()方法中:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } }
1,這個無效循環實際上可以簡化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
2,可選的操作:in.release()
釋放資源
接下來,你就可以看到打印的信息了。
========================================================
寫一個回顯的服務器
到目前為止,我們一直在消費數據,沒有任何反應。 然而,服務器通常應該響應請求。 讓我們學習如何通過實現ECHO協議向客戶端寫入響應消息,其中任何接收到的數據都將被發回。與前面部分實現的丟棄服務器的唯一區別在於它將接收到的數據發回,而不是將接收到的數據輸出到控制臺。 因此,再次修改channelRead()方法是足夠的:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
1,ChannelHandlerContext對象提供了各種操作,可以觸發各種I / O事件和操作。 在這裏,我們調用write(Object)來逐字寫入接收到的消息。 請註意,我們沒有像DISCARD示例中那樣發布接收的消息。 這是因為,當Netty發布給電子郵件時,Netty會為您發布。
2,ctx.write(Object)不會將消息寫入管道。 它在內部緩沖,然後通過ctx.flush()刷出數據。 或者,為簡潔起見,您可以調用ctx.writeAndFlush(msg)。
===========================================================
寫一個時間服務器
本節中實現的協議是TIME協議。 與前面的示例不同的是,它發送一個包含32位整數的消息,而不接收任何請求,並在發送消息後關閉連接。 在此示例中,您將學習如何構建和發送消息,並在完成時關閉連接。
因為我們將忽略任何接收到的數據,而是在建立連接後立即發送消息,這次我們不能使用channelRead()方法。 相反,我們應該覆蓋channelActive()方法。 以下是實現:
服務引擎:
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
1,如上所述,當建立連接並準備好生成連接時,將調用channelActive()方法。 我們來寫一個32位整數,表示這個方法當前的時間。
2,要發送新消息,我們需要分配一個包含消息的新緩沖區。 我們要寫一個32位整數,因此我們需要一個容量至少為4個字節的ByteBuf。 通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator,並分配一個新的緩沖區。
3,接下來,我構造消息:
但等等,消息切割在哪裏?在NIO發送消息之前,我們是否曾經調用過java.nio.ByteBuffer.flip()? ByteBuf沒有這樣的方法,因為它有兩個指針;一個用於讀操作,另一個用於寫操作。當讀寫器索引沒有改變時,寫入索引會增加。讀者索引和寫索引分別表示消息的開始和結束位置。
相比之下,NIO緩沖區不能提供一個幹凈的方式來確定消息內容的起始和結束位置,而不需要調用flip方法。當您忘記翻轉緩沖區時,您將會遇到麻煩,因為沒有發送任何數據或不正確的數據。在Netty中不會發生這樣的錯誤,因為我們針對不同的操作類型有不同的指針。你會發現它使你的操作更簡單。
另外要註意的是,ChannelHandlerContext.write()(和writeAndFlush())方法返回一個ChannelFuture。 ChannelFuture表示尚未發生的I / O操作。這意味著任何請求的操作可能尚未執行,因為所有操作在Netty中都是異步的。例如,即使在發送消息之前,以下代碼也可能會關閉連接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,您需要在ChannelFuture完成之後調用close()方法,該方法由write()方法返回,並且在寫入操作完成後通知其監聽器。 請註意,close()也可能不會立即關閉連接,並返回ChannelFuture。
4,當寫請求完成後,我們如何得到通知? 這就像將ChannelFutureListener添加到返回的ChannelFuture一樣簡單。 在這裏,我們創建了一個新的匿名ChannelFutureListener,當操作完成時關閉通道。
f.addListener(ChannelFutureListener.CLOSE);
要測試我們的時間服務器是否按預期工作,可以使用UNIX rdate命令:
$ rdate -o <port> -p <host>
其中<port>是在main()方法中指定的端口號,<host>通常是localhost。
=======================================================================
編寫一個時間的客戶端
與DISCARD服務器和回顯服務器不同,我們需要TIME協議的客戶端,因為人類無法將32位二進制數據轉換為日歷上的日期。 在本節中,我們將討論如何確保服務器正常工作,並學習如何使用Netty編寫客戶端。
Netty中服務器和客戶端之間最大和唯一的區別是使用不同的Bootstrap和Channel實現。 請看下面的代碼:
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
1,Bootstrap類似於ServerBootstrap,除了它用於非服務器通道,如客戶端或無連接通道。
2,如果只指定一個EventLoopGroup,它將同時用作一個老板組和一個工作組。 雖然老板的工作人員不用於客戶端。
3,NioSocketChannel代替NioServerSocketChannel,用於創建客戶端channel。
4,請註意,我們不使用childOption(),這與ServerBootstrap不同,因為客戶端SocketChannel沒有父級。
5,我們應該調用connect()方法而不是bind()方法。
可以看到,它與服務器端代碼沒有什麽不同。 ChannelHandler的實現如何? 它應該從服務器收到一個32位整數,將其翻譯成可讀的格式,打印翻譯時間,並關閉連接:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
1,在TCP / IP中,Netty將從對等體發送到ByteBuf的數據。
它看起來很簡單,與服務器端的示例看起來沒有什麽不同。 但是,這個處理程序有時會拒絕提高IndexOutOfBoundsException。 我們討論為什麽會在下一節中發生這種情況。
===================================================
總結:這主要翻譯官方文獻,內容還是很清晰的。
地址:http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
也可以官方下載相應的例子進行研究
netty 入門(一)