Netty專欄 (一)——— Netty初步介紹和簡單Demo
Netty初步介紹和簡單Demo
@author 魯偉林 記錄《Netty 實戰》中各章節學習過程,寫下一些自己的思考和總結,幫助使用Netty框架的開發技術人員們,能夠有所得,避免踩坑。 本部落格目錄結構將嚴格按照書本《Netty 實戰》,省略與Netty無關的內容,可能出現跳小章節。 本部落格中涉及的完整程式碼,
GitHub地址: https://github.com/thinkingfioa/netty-learning/tree/master/netty-in-action。 本人部落格地址: https://blog.csdn.net/thinking_fioa
1. Netty - 非同步和事件驅動
1. Netty 能夠幫助搭建允許系統能夠擴充套件到支援150000名併發使用者。
2. Netty 設計關鍵: 非同步 + 事件驅動
1.1 Java網路程式設計(BIO)
典型的BIO服務端:
1. 一個主執行緒在某個port監聽,等待客戶端連線。
2. 當接收到客戶端發起的連線時,建立一個新的執行緒去處理客戶端請求。
3. 主執行緒重新回到port監聽,等待下一個客戶端連線。
缺點:
1. 每個新的客戶端Socket都需要建立一個新的Thread處理,將會導致大量的執行緒處於休眠狀態。
2. 每個執行緒都有呼叫棧的記憶體分配,連線數非常多時,耗費較多記憶體。
3. 連線數比較多時,建立大量執行緒,上下文切換所帶來的開銷較大。
程式碼:
public void serve(int port) throws IOException { // 建立Socket ServerSocket serverSocket = new ServerSocket(port); // 等待客戶端連線 Socket clientSocket = serverSocket.accept(); // 建立輸入流 BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); String request, response; while((request = in.readLine()) != null) { if("Done".equals(request)) { break; } response = processRequest(request); out.println(response); } }
1.2 Java NIO
1. 使用Selector來實現Java的非阻塞I/O操作。將多個Socket的讀寫狀態繫結到Selector上,允許在任何時間檢查任意的讀操作/寫操作的完成狀態。
2. 允許單個執行緒處理多個併發的連線。
1.3 Netty的核心元件
Netty的主要構件塊:
1. Channel
2. 回撥
3. Future
4. 事件和ChannelHandler
1.3.1 Channel
Channel是傳入(入站)或者傳出(出站)資料的載體(如一個檔案、一個Socket或一個硬體裝置)。可以被開啟或者被關閉,連線或斷開連線。
1.3.2 回撥
回撥只是:先寫一段程式碼,該段程式碼在將來某個適當的時候會被執行。Netty大量使用了回撥,比如:某ChannelHandler中的channelActive()方法則是一個回撥,表示連線建立時,請執行該段回撥程式碼。
1.3.3 Future
非同步操作佔位符。在操作完成時,提供結果的訪問。
JDK提供的Future和ChannelFuture對比:
1. JDK提供的Future需要手動檢查對應的操作是否完成,或一直阻塞直到它完成
2. ChannelFuture能夠註冊Listener監聽器,監聽器的回撥函式operationComplete()能非同步的在操作完成時被呼叫。
程式碼:
public static void connect() {
Channel channel = CHANNEL_FROM_SOMEWHERE;
ChannelFuture future = channel.connect(new InetSocketAddress("127.0.0.1", 9080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()) {
ByteBuf buf = Unpooled.copiedBuffer("hello", Charset.defaultCharset());
ChannelFuture wf = future.channel().writeAndFlush(buf);
// ...
} else {
// 失敗後可嘗試重連/切換鏈路
future.cause().printStackTrace();
}
}
})
}
1.3.4 事件和ChannelHandler
1. 事件:發生某種事件觸發適當的動作。比如入站觸發事件: 鏈路啟用(channelActive)/資料可讀(channelRead)/發生異常(exceptionCaught)/...
2. Channelhandler:一組為了響應特定事件而被執行的回撥函式。如:ChannelInboundHanderAdapter.java是一個入站事件
1.3.5 Channel和EventLoop關係:
Channel和EventLoop都是Netty核心概念,而且有一些約定俗成的規定,能幫助程式設計和理解:
1. 單個Channel只會對映到單個EventLoop
2. 單個EventLoop可以處理多個Channel(1:n關係)
3. 一個EventLoop在其生命週期內只能繫結到一個執行緒上4. 由於單個Channel在其生命週期中只會有一個I/O執行緒,所以ChannelPipeline中多個ChannelHandler無需關心同步互斥問題
2. 第一款Netty應用程式
1. ChannelHandler用於構建應用業務邏輯。往往封裝了為響應特定事件而編寫的回撥函式
2. 本節主要講解一個超級簡單的Netty應用程式,回顯服務: 客戶端建立連線後,傳送一個或多個訊息。服務端收到後,將訊息返回。
2.3 編寫Echo伺服器
Netty服務端至少需要兩個部分: 一個ChannelHandler + 引導(Bootstrap)
2.3.1 ChannelHandler和業務邏輯
繼承ChannelInboundHandlerAdapter類,感興趣的入站方法:
1. channelRead() - 對於每個傳入的訊息都要呼叫
2. channelReadComplete() - 當前批量讀取中的最後一條資料
3. exceptionCaught() - 讀取操作期間,有異常丟擲時呼叫
程式碼:
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
/**
* 每次傳入的訊息都要呼叫
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(
"Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
/**
* 讀完當前批量中的最後一條資料後,觸發channelReadComplete(...)方法
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/**
* 異常捕獲
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
解釋:
1. channelRead和channelReadComplete理解:當批量訊息後最後一條資料被channelRead(...)後觸發channelReadComplete事件。
2. ctx.write(...)只是將訊息暫時存放在ChannelOutboundBuffer中,等待flush(...)操作
3. @Sharable註解:本質是宣告該ChannelHandler全域性單例。可被多個Channel安全的共享。標註了@Sharable註解的ChannelHandler請注意不能有對應的狀態
2.3.2 引導伺服器
1. 引導伺服器主要開啟Netty的Channel。並分配對應的EventLoop和ChannelPipeline。
2. 一個Channel只有一個ChannelPipeline。ChannelPipeline是由一組ChannelHandler組成的責任鏈。
程式碼:
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
} finally {
group.shutdownGracefully().sync();
}
2.4 編寫Echo客戶端
客戶端將會:
1. 建立連線
2. 傳送訊息
3. 關閉連線
2.4.1 ChannelHandler客戶端邏輯
1. Java是通過GC可達性分析來實現垃圾回收。對於Netty傳輸中的ByteBuf,使用的是引用計數演算法。也就是說:如果你使用了Netty,需要你親自考慮是否需要手動釋放物件。判斷方法,後文將會給出
2. 擴充套件SimpleChannelInboundHandler類處理任務的Handler,無需手動釋放物件。SimpleChannelInboundHandler.java中方法channelRead()中會負責釋放引用。
3. 客戶端傳送訊息條數和服務端接收的訊息條數是不對應的。除非處理了TCP的粘包黏包。
程式碼:
// SimpleChannelInboundHandler<T>中channelRead方法負責釋放物件msg引用
public abstract class SimpleChannelInboundHandler<I> ...{
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
// ...
} finally {
if (autoRelease && release) {
// 減少物件msg引用計數
ReferenceCountUtil.release(msg);
}
}
}
}
問:ChannelHandler中何時需要主動釋放引用?
1. 擴充套件的類不是: SimpleChannelInboundHandler,且該物件msg不會傳給下一個ChannelHandler
2. 擴充套件的類不是: SimpleChannelInboundHandler,且該物件msg不會被ctx.write(...)
2.4.2 引導客戶端
給出引導客戶端關鍵程式碼,完整程式碼請參考地址
程式碼:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoClientHandler());
}
});
// 下面兩行程式碼可以刪除
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}