1. 程式人生 > >Netty事件監聽和處理(下)

Netty事件監聽和處理(下)

關註 proto cte meta pro sse cti 讀取 線程模型

上一篇 介紹了事件監聽、責任鏈模型、socket接口和IO模型、線程模型等基本概念,以及Netty的整體結構,這篇就來說下Netty三大核心模塊之一:事件監聽和處理。

前面提到,Netty是一個NIO框架,它將IO通道的建立、可讀、可寫等狀態變化,抽象成事件,以責任鏈的方式進行傳遞,可以在處理鏈上插入自定義的Handler,對感興趣的事件進行監聽和處理。

通過介紹,你會了解到:

  • 事件監聽和處理模型
  • 事件監聽:EventLoop
  • 事件處理:ChannelPipeline和ChannelHandler
  • 使用Netty實現Websocket協議

文章末尾有福利 ~

事件監聽和處理模型

進行網絡編程時,一般的編寫過程是這樣的:

  • 創建服務端Socket,監聽某個端口;
  • 當有客戶端連接時,會創建一個新的客戶端Socket,監聽數據的可讀、可寫狀態,每一個連接請求都會創建一個客戶端Socket;
  • 讀取和寫入數據都會調用Socket提供的接口,接口列表在上一篇提到過;

傳統的模型,每個客戶端Socket會創建一個單獨的線程監聽socket事件,一方面系統可創建的線程數有限,限制了並發數,一方面線程過多,線程切換頻繁,導致性能嚴重下降。

隨著操作系統IO模型的發展,可以采用多路復用IO,一個線程監聽多個Socket,另外,服務端處理客戶端連接,與客戶端Socket的監聽,可以在不同的線程進行處理。

Netty就是采用多路復用IO進行事件監聽,另外,使用不同的線程分別處理客戶端的連接、數據讀寫。

整個處理結構如下圖,簡單說明下:

  • Boss EventLoopGroup主要處理客戶端的connect事件,包含多個EventLoop,每個EventLoop一個線程;
  • Worker EventLoopGroup主要處理客戶端Socket的數據read、write事件,包含多個EventLoop,每個EventLoop一個線程;
  • 無論是Boos還是Worker,事件的處理都是通過Channel Pipleline組織的,它是責任鏈模式的實現,包含一個或多個Handler;
  • 偵聽一個端口,只會綁定到Boss EventLoopGroup中的一個Eventloop;
  • Worker EventLoopGroup中的一個Eventloop,可以監聽多個客戶端Socket;

技術分享圖片

EventLoop

一個EventLoop其實和一個特定的線程綁定, 並且在其生命周期內, 綁定的線程都不會再改。

EventLoop肩負著兩種任務:

  • 第一個是作為 IO 線程, 執行與 Channel 相關的 IO 操作, 包括 調用select等待就緒的IO事件、讀寫數據與數據的處理等;
  • 第二個任務是作為任務隊列, 執行 taskQueue 中的任務, 例如用戶調用eventLoop.schedule提交的定時任務也是這個線程執行的;

第一個任務比較好理解,主要解釋下第二個:從socket數據到數據處理,再到寫入響應數據,Netty都在一個線程中處理,主要是為了線程安全考慮,減少競爭和線程切換,通過任務隊列的方式,可以在用戶線程提交處理邏輯,在Eventloop中執行。

整個EventLoop幹的事情就是select -> processIO -> runAllTask,processIO處理IO事件相關的邏輯,runAllTask處理任務隊列中的任務,如果執行的任務過多,會影響IO事件的處理,所以會限制任務處理的時間,整個處理過程如下圖:

技術分享圖片

EventLoop的run代碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected void run() {
for (; ; ) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) { //如果有任務,快速返回
selectNow();
} else {
select(); //如果沒任務,等待事件返回
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;

//處理IO事件
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}

//計算IO處理時間
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio; //默認為50

//處理提交的任務
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}

ChannelPipeline和ChannelHandler

ChannelPipeline是一個接口,其有一個默認的實現類DefaultChannelPipeline,內部有兩個屬性:head和tail,
這兩者都實現了ChannelHandler接口,對應處理鏈的頭和尾。

1
2
3
4
5
6
7
8
9
10
11
 protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

每個Channel創建時,會創建一個ChannelPipeline對象,來處理channel的各種事件,可以在運行時動態進行動態修改其中的 ChannelHandler。

ChannelHandler承載業務處理邏輯的地方,我們接觸最多的類,可以自定義Handler,加入處理鏈中,實現自定義邏輯。

ChannelHandler 可分為兩大類:ChannelInboundHandler 和 ChannelOutboundHandle,這兩接口分別對應入站和出站消息的處理,對應數據讀取和數據寫入。它提供了接口方法供我們實現,處理各種事件。

1
2
3
4
5
6
7
8
9
10
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}

自定義Handler時,一般繼承ChannelInboundHandlerAdapter或 ChannelOutboundHandlerAdapter。

需要註意的是,不建議在 ChannelHandler 中直接實現耗時或阻塞的操作,因為這可能會阻塞 Netty 工作線程,導致 Netty 無法及時響應 IO 處理。

技術分享圖片

使用Netty實現Websocket協議

Websocket協議

不是本篇的重點,簡單說明下:

  • 是一種長連接協議,大部分瀏覽器都支持,通過websocket,服務端可以主動發消息給客戶端;
  • Websocket協議,在握手階段使用HTTP協議,握手完成之後,走Websocket自己的協議;
  • Websocket是一種二進制協議;
初始化

Netty提供了ChannelInitializer類方便我們初始化,創建WebSocketServerInitializer類,繼承ChannelInitializer類,用於添加ChannelHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

@Resource
private CustomTextFrameHandler customTextFrameHandler;

@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));

pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
pipeline.addLast("custome-handler", customTextFrameHandler);
}
}

分析下這幾個Handler,都是Netty默認提供的:

  • HttpServerCodec:用於解析Http請求,主要在握手階段進行處理;
  • HttpObjectAggregator:用於合並Http請求頭和請求體,主要在握手階段進行處理;
  • WebSocketServerProtocolHandler:處理Websocket協議;
  • CustomTextFrameHandler:自定義的Handler,用於添加自己的業務邏輯。

是不是很方便,經過WebSocketServerProtocolHandler處理後,讀取出來的就是文本數據了,不用自己處理數據合包、拆包問題。

CustomTextFrameHandler

自定義的Handler,進行業務處理:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
final String content = frame.text();
System.out.println("接收到數據:"+content);

// 回復數據
TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的數據");
if (ctx.channel().isWritable()) {
ChannelFuture future = ctx.writeAndFlush(respFrame);
}
}
}

福利說明

最後,說下福利:小愛音箱F碼。

準備了2份,主要為了感謝「微信公眾號」和「掘金社區」的朋友,每一份包括1個小愛音箱F碼和1個小愛音箱 mini F碼。

小米手機F碼源自於英文單詞”Friend”,是小米公司提供給小米核心用戶及為小米做出貢獻的網友的優先購買權,如果您有小米F碼的話無需等待即可直接利用小米F碼購買相關產品!

簡單來說,F碼就是不用搶了,可以直接購買 ~

抽獎截止時間

4月9號中午12點

抽獎規則
掘金社區
  • 需要關註我的掘金賬號才有效,個人主頁;

  • 使用微信抽獎助手隨機抽取for掘金社區;
    技術分享圖片

微信公眾號
  • 需要關註我的微信公眾號才有效;
    技術分享圖片

  • 使用微信抽獎助手隨機抽取for微信公眾號;
    技術分享圖片

Netty事件監聽和處理(下)