netty原始碼深入研究(從客戶端入手)第四篇(讀寫超時詳解)
阿新 • • 發佈:2019-01-09
怎麼設定讀寫超時的監聽函式呢,首先從文件開始,或者看看官方有沒有例子,一般任何平臺的官方都會或多或少的提供例子。
官方文件有這樣一個類new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds),我這裡就用中式英語翻譯,大體的意思就是空閒狀態的處理者,第一個引數設定讀超時時間,第二個引數設定寫超時時間,第三個設定總的超時時間。
怎麼用呢,咱們先看看它的父類是什麼public class IdleStateHandler extends ChannelDuplexHandler,那麼ChannelDuplexHandler
ch.pipeline().addLast("IdleState",
new IdleStateHandler(5, 5, 10));
構造方法裡,判斷設定的時間值,如果小於等於0,設定時間為0,總時間取預設值和設定值的最大值。直接點類進入後我們只能跟蹤到構造方法,其他方法基本上都是被回撥的方法,看這個public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } this.observeOutput = observeOutput; if (readerIdleTime <= 0) { readerIdleTimeNanos = 0; } else { readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS); } if (writerIdleTime <= 0) { writerIdleTimeNanos = 0; } else { writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS); } if (allIdleTime <= 0) { allIdleTimeNanos = 0; } else { allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); } }
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() && ctx.channel().isRegistered()) { // channelActive() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead. initialize(ctx); } else { // channelActive() event has not been fired yet. this.channelActive() will be invoked // and initialization will occur there. } }
handlerAdded()方法在什麼時候被執行呢,第一篇講過連線之前,要進行註冊,那麼跟進去看看是不是那個地方執行的,該方法在DefaultChannelPipeline的callHandlerAdded0方法裡呼叫
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查handler是否合法
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
//被呼叫
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//被呼叫
callHandlerAdded0(newCtx);
return this;
}
看程式碼中文註釋部分,此方法原來是在新增的時候被呼叫,和註冊沒關係。
如果此時連線已經建立,那麼就初始化這個管道,0k,開始執行initialize方法,
private void initialize(ChannelHandlerContext ctx) {
//第一次進入的時候state為預設值,肯定不是1和2,跳過
switch (state) {
case 1:
case 2:
return;
}
state = 1;
//設定是否監聽寫的資訊,預設不監聽
initOutputChanged(ctx);
//當前時間
lastReadTime = lastWriteTime = ticksInNanos();
//假如設定的超時時間有效則啟動監聽任務
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
啟動任務最後還是加入到NioEventLoop的訊息佇列中,並延時執行,任務執行每次都是在讀寫以後。接下來看一下這幾個任務都幹了什麼
讀任務的程式碼:
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
//是否正在讀,每次呼叫管道讀的時候reading會變成true,而讀完後reading變成false
if (!reading) {
//沒有讀則計算時間
nextDelay -= ticksInNanos() - lastReadTime;
}
//如果計算值為負數,則超時
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
//進行下一次統計任務
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
//通知超時
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
//進行下一次統計任務
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
通過上面程式碼可以發現,每次執行到任務時獲取當前的時間減去上一次讀取的時間,如果大於最大讀取時間的話,那麼回撥超時介面。那麼回撥的是什麼呢?跟進看一下
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
最後又通過管道包裝類把回撥分發出去
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
invokeUserEventTriggered(findContextInbound(), event);
return this;
}
最後又執行到迴圈呼叫佇列方法的地方
private void invokeUserEventTriggered(Object event) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireUserEventTriggered(event);
}
}
因為將管道新增進去的時候,invokeHander已然返回true,所以必然執行((ChannelInboundHandler) handler()).userEventTriggered(this, event);
也就是每一個讀管道的
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// TODO Auto-generated method stub
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
// 未進行寫操作傳送心跳包
String heratBeat = "{\"event\":\"HEARTBEAT\"}";
ctx.write(Unpooled.copiedBuffer(heratBeat.getBytes()));
ctx.flush();
}
}
super.userEventTriggered(ctx, evt);
}
這個方法,我們可以在這個裡面做一些事情,比如一定時間沒有和伺服器發信息了,那麼傳送心跳,看super.userEventTriggered(ctx, evt),它的父類方法都做了什麼
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
奧,這不是又把事件分發給佇列的下一個管道處理類了嗎,預設的解析器我們不會重寫userEventTriggered方法,所以他會直接分發給下一個管道類處理。
那麼繼續,從上面的讀超時任務,知道有一個最後讀取資料的時間變數lastReadTime,那麼它是怎麼計算的呢?
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
}
這個值是讀資料完成的回撥方法裡執行的,第二篇有詳細介紹
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//讀管道處理操作回撥
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
//讀取完成的操作回撥
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
看程式碼中文註釋,現在讀超時的思路已經清晰。接著看寫任務做了哪些
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
執行邏輯和讀差不多,其他跳過直接看lastWriteTime這個引數哪來的 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
哎要,它是在回撥函式中設定的值
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// Allow writing with void promise if handler is only configured for read timeout events.
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
ChannelPromise unvoid = promise.unvoid();
unvoid.addListener(writeListener);
ctx.write(msg, unvoid);
} else {
ctx.write(msg, promise);
}
}
是在write方法中加入的回撥值,額什麼時候執行write呢
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
只要實現了此介面就能監聽的管道的寫訊息,所以IdleStateHandler實現了implements ChannelOutboundHandler介面,寫方法當然會執行,就和第二篇寫訊息流程一樣,不再具體分析。
第五篇的時候,我會來一個netty的總結,把類關係圖弄一下,那樣感覺更直觀。