hystrix源碼小貼士之調用timeout實現
阿新 • • 發佈:2017-12-18
prop es.exe 發送 abs spec led 內部 事件 error
AbstractCommand執行命令前首先會判斷是否開啟了timeout。
if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); }
HystrixObservableTimeoutOperator的核心是內部又一個定時器,當達到timeout時間時,將命令狀態設置成timeout,向eventNotifier發送timeout事件,取消監聽,返回一個HystrixTimeoutException的onError。
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { .... @Override public Subscriber<? superR> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); child.add(s); /* * Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext * of the calling thread which doesn‘t exist on the Timer thread.*/ final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); TimerListener listener = new TimerListener() { @Override public void tick() { // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed or did not start if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); timeoutRunnable.run(); //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } };
...
} }
hystrix源碼小貼士之調用timeout實現