1. 程式人生 > >[編織消息框架][netty源碼分析]9 Promise 實現類DefaultPromise職責與實現

[編織消息框架][netty源碼分析]9 Promise 實現類DefaultPromise職責與實現

flush alt read true boolean listener seconds ren ces

netty Future是基於jdk Future擴展,以監聽完成任務觸發執行
Promise是對Future修改任務數據
DefaultPromise是重要的模板類,其它不同類型實現基本是一層簡單的包裝,如DefaultChannelPromise
主要是分析await是如何等侍結果的

技術分享

public interface Future<V> extends java.util.concurrent.Future<V> {
   Future<V> addListener(GenericFutureListener<? extends Future<? super
V>> listener); } public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V result); boolean trySuccess(V result); Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); boolean setUncancellable(); } public
class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean
interruptable) throws InterruptedException { //已完成任務直接忽略 if (isDone()) { return true; } //沒有等侍時間返回處理記錄 if (timeoutNanos <= 0) { return isDone(); } //已中斷拋異常 if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } //checkDeadLock(); //netty 認為是當前線程是死鎖狀態 EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } long startTime = System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false; try { for (;;) { synchronized (this) { if (isDone()) { return true; } //最大檢查次數為 Short.MAX_VALUE //很奇怪的邏輯,處理完後又自減 if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this); } ++waiters; try { //阻塞的代碼只是一行參數1是milliseconds,參數2是輔助用的大於0時milliseconds+1,如果是0的話會無限制阻塞 wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } finally { waiters--; } } //這裏是double check跟並發無影響的邏輯放在synchronized外面 if (isDone()) { return true; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0) { return isDone(); } } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } } public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; public DefaultChannelPromise(Channel channel) { this.channel = channel; } public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = channel; } }

[編織消息框架][netty源碼分析]9 Promise 實現類DefaultPromise職責與實現