1. 程式人生 > >【一起學原始碼-微服務】Hystrix 原始碼三:Hystrix核心流程:Hystix降級、熔斷等原理剖析

【一起學原始碼-微服務】Hystrix 原始碼三:Hystrix核心流程:Hystix降級、熔斷等原理剖析

說明

原創不易,如若轉載 請標明來源!

歡迎關注本人微信公眾號:壹枝花算不算浪漫
更多內容也可檢視本人部落格:一枝花算不算浪漫

前言

前情回顧

上一講我們講解了Hystrix在配合feign的過程中,一個正常的請求邏輯該怎樣處理,這裡涉及到執行緒池的建立、HystrixCommand的執行等邏輯。

如圖所示:

高清大圖:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e

本講目錄

這一講開始講解Hystrix的看家本領:熔斷+降級。
熔斷功能是Hystrix最核心的元件,當然也是最複雜的一塊。
原始碼中細節太多,本講我們主要還是專注於它的設計思想去學習。

目錄如下:

  1. HystrixCircuitBreaker初始化過程
  2. Hystrix熔斷機制(CLOSED/OPEN/HALF_OPEN)
  3. fallback降級機制

原始碼分析

HystrixCircuitBreaker初始化過程

我們還是會以AbstractCommand為突破口,這裡繼續看它的建構函式,其中裡面有初始化熔斷器initCircuitBreaker()的過程,具體程式碼如下:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // 構建預設的HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }
}


public interface HystrixCircuitBreaker {
    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        // circuitBreakersByCommand是一個map,key為commandKey,也就是FeignClient中定義的方法名
        // 類似於ServiceAFeignClient.sayHello(String)
        HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // 每個commandKey都對應著自己的熔斷器,如果沒有則會構造一個HystrixCircuitBreaker
        HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
        if (cbForCommand == null) {
            return circuitBreakersByCommand.get(key.name());
        } else {
            return cbForCommand;
        }
    }

    class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        private Subscription subscribeToStream() {
            // 對HealthCounts進行訂閱
            // HealthCounts中包含 總請求次數、總失敗次數、失敗率
            // HealthCounts 統計資料有變化則會回撥到這裡來
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        // 判斷是否要降級的核心邏輯
                        @Override
                        public void onNext(HealthCounts hc) {
                            // 一個時間視窗(預設10s鍾)總請求次數是否大於circuitBreakerRequestVolumeThreshold 預設為20s
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                
                            } else {
                                // 錯誤率(總錯誤次數/總請求次數)小於circuitBreakerErrorThresholdPercentage(預設50%)
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

                                } else {
                                    // 反之,熔斷狀態將從CLOSED變為OPEN,且circuitOpened==>當前時間戳
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }
    }
}

上面就是熔斷器初始化過程,這裡面做了幾件事:

  1. 每個commandKey都有自己的一個熔斷器
    commandKey表現形式為:ServiceAFeignClient#sayHello(String)

  2. 如果commandKey不存在熔斷器,則構建預設熔斷器
    預設熔斷器會對HealthCounts進行訂閱。HealthCounts中包含時間視窗內(預設10s鍾)請求的總次數、失敗次數、失敗率

  3. HealthCounts中統計資料有變化則會回撥subscribe.onNext()方法進行熔斷開啟判斷

  4. 熔斷開啟條件:
  • 時間視窗內(預設10s鍾)總請求次數大於20次
  • 時間視窗內(預設10s鍾)失敗率大於50%
  • 滿足上述兩個條件後熔斷器狀態從CLOSED變成OPEN

熔斷器在第一次請求時會初始化AbtractCommand,同時也會建立對應commandKey的熔斷器 ,熔斷器預設都是關閉的(可配置為強制開啟),只有滿足觸發條件才會被開啟。下面就一起來看下熔斷、半開等狀態是如何觸發的吧。

Hystrix熔斷機制(CLOSED/OPEN/HALF_OPEN)

這裡我們以AbstractCommand.applyHystrixSemantics() 為入口,一步步往下探究,這個方法在上一講已經提到過,一個正常的Feign請求都會呼叫此方法。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // 如果熔斷了,這這裡返回為false
        // 這裡也包含HALF_OPEN邏輯
        if (circuitBreaker.attemptExecution()) {
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

circuitBreaker.attemptExecution() 這個邏輯就是判斷,如果熔斷了,那麼返回false。而且這裡還包含HALF_OPEN的邏輯,我們先看如何觸發熔斷的,這個後面再接著看。

接著往下跟進executeCommandAndObserve() 方法:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        // 省略部分程式碼...

        // 執行過程中,出現異常等都會進入此回撥函式
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            // 這裡建立一個 HystrixObservableTimeoutOperator
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
}

當我們服務呼叫中出現異常都會進入handleFallback()中,裡面的方法我們就不繼續跟入了,猜測裡面會有更新HealthCounts中的屬性,然後觸發 HystrixCircuitBreaker中的onNext()方法,當滿足熔斷條件時 則會將熔斷狀態從CLOSED變成OPEN

這裡我們會跟進下HystrixObservableTimeoutOperator 程式碼,這個是對我們執行過程中判斷是否超時。
上面程式碼中,執行executeCommandWithSpecifiedIsolation() 方法時也會建立一個超時監視器:

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {
        TimerListener listener = new TimerListener() {

            @Override
            public void tick() {
                // 判斷command的timeOut狀態,如果是未執行狀態,則更新為已超時
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    s.unsubscribe();

                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                        @Override
                        public void run() {
                            child.onError(new HystrixTimeoutException());
                        }
                    });


                    timeoutRunnable.run();
                }
            }

            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };

        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        originalCommand.timeoutTimer.set(tl);

        // 省略部分程式碼...
        s.add(parent);

        return parent;
    }
}

public class HystrixTimer {
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    // 執行上面的tick方法,改變command timeout狀態
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        // 執行排程任務,延遲載入,延遲時間和排程時間預設都為1s鍾
        // 這裡使用執行緒池,coreSize=cpu核心數 maxSize為Integer.Max
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }
}

這裡面核心業務是起一個排程任務,預設每秒鐘執行一次,然後呼叫tick()方法,如果當前command狀態還是NOT_EXECUTED狀態,那麼將command狀態改為TIMED_OUT 。此時會進入到之前的handleFallback回撥函式中,這裡又會更新HealthCounts中的資料,對應的觸發之前熔斷的判斷條件:

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
  this.properties = properties;
  this.metrics = metrics;

  //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
  Subscription s = subscribeToStream();
  activeSubscription.set(s);
}

private Subscription subscribeToStream() {
  //這裡會在每次執行onNext()事件的時候來評估是否需要開啟或者關閉斷路器
  return metrics.getHealthCountsStream()
    .observe()
    .subscribe(new Subscriber<HealthCounts>() {
      @Override
      public void onCompleted() {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onNext(HealthCounts hc) {
        //首先校驗的時在時間窗範圍內的請求次數,如果低於閾值(預設是20),不做處理,如果高於閾值,則去判斷介面請求的錯誤率
        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {           // 如果沒有超過統計閾值的最低視窗值,就沒有必要去改變斷路器的狀態
          // 當前如果斷路器是關閉的,那麼就保持關閉狀態無需更改;
          // 如果斷路器狀態為半開狀態,需要等待直到有成功的命令執行;
          // 如果斷路器是開啟狀態,需要等待休眠視窗過期。
        } else {
          //判斷介面請求的錯誤率(閾值預設是50),如果高於這個值,則斷路器開啟
          if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
    
            // 如果當前請求的錯誤率小於斷路器設定的容錯率百分比,也不會攔截請求
          } else {
            // 如果當前錯誤率太高則開啟斷路器
            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
              circuitOpened.set(System.currentTimeMillis());
            }
          }
        }
      }
    });
}

如果符合熔斷條件,那麼command熔斷狀態就會變為OPEN,此時熔斷器開啟。

如果我們command執行成功,那麼就會清理掉這個timeout timer schedule任務。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private void handleCommandEnd(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        // 如果timeOutTimer不為空,這裡則clear一下
        // clear會關閉啟動的排程任務
        if (tl != null) {
            tl.clear();
        }

        long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
        if (executionResultAtTimeOfCancellation == null) {
            // metrics統計資料
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
        } else {
            metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
        }

        if (endCurrentThreadExecutingCommand != null) {
            endCurrentThreadExecutingCommand.call();
        }
    }
}

如上所屬,我們已經知道了熔斷開啟的觸發時機,那麼如果一個commandKey開啟了熔斷,下次的請求是該如何直接降級呢?我們來看下程式碼:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

        // 這個if條件就代表是否開啟熔斷
        if (circuitBreaker.attemptExecution()) {
            // 執行業務邏輯程式碼...
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    public boolean attemptExecution() {
            // 如果熔斷配置的為強制開啟,那麼直接返回false執行熔斷邏輯
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            // 如果熔斷配置為強制關閉,那麼永遠不走熔斷邏輯
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            // 熔斷開啟時 circuitOpened設定為當前時間戳
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                // 如果當前時間距離熔斷小於5s鍾,那麼將熔斷狀態從OPEN改為HALF_OPEN
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        // circuitBreakerSleepWindowInMilliseconds 預設為5s鍾
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        // 當前熔斷距離熔斷是否超過5s鍾
        return currentTime > circuitOpenTime + sleepWindowTime;
    }
}

我們可以看到,在applyHystrixSemantics()這個核心的方法中,先判斷是否熔斷,如果熔斷則直接走fallback邏輯。

attemptExecution()判斷條件中還涉及到HALF_OPEN的邏輯,如果熔斷開啟,下一次請求的時候,會判斷當前時間距離上一次時間是否超過了5s鍾,如果沒有超過,則會將熔斷狀態從OPEN變為HALF_OPEN,此時會放一個請求按照正常邏輯去執行:

  1. 執行失敗,熔斷狀態又會從HALF_OPEN變成OPEN
  2. 執行成功,熔斷狀態從HALF_OPEN變成CLOSED,並清除熔斷相關設定

執行成功後代碼:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    public void markSuccess() {
        if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
            //This thread wins the race to close the circuit - it resets the stream to start it over from 0
            metrics.resetStream();
            Subscription previousSubscription = activeSubscription.get();
            if (previousSubscription != null) {
                previousSubscription.unsubscribe();
            }
            Subscription newSubscription = subscribeToStream();
            activeSubscription.set(newSubscription);
            circuitOpened.set(-1L);
        }
    }
}

上面對整個熔斷的狀態:CLOSED、OPEN、HALF_OPEN梳理的已經很清楚了,下面看看降級是該如何處理的吧。

fallback降級機制

上面已經講解了Hystrix 熔斷開啟的機制等內容,這裡主要是說如果一個請求失敗(執行緒池拒絕、超時、badRequest等),那麼Hystrix是如何執行降級的呢?

還是回到我們最初的程式碼 HystrixInvocationHandler類中,看看其invoke()方法中的getFallback回撥函式:

protected Object getFallback() {
    if (fallbackFactory == null) {
      return super.getFallback();
    }
    try {
      // 通過我們配置好的fallbackFactory找到對應的FeignClient,這裡是獲取ServiceAFeignClient
      Object fallback = fallbackFactory.create(getExecutionException());
      // fallbackMap中key為ServiceAFeignClient.sayHello(Integer)
      // 獲取具體的降級method方法
      Object result = fallbackMethodMap.get(method).invoke(fallback, args);
      if (isReturnsHystrixCommand(method)) {
        return ((HystrixCommand) result).execute();
      } else if (isReturnsObservable(method)) {
        // Create a cold Observable
        return ((Observable) result).toBlocking().first();
      } else if (isReturnsSingle(method)) {
        // Create a cold Observable as a Single
        return ((Single) result).toObservable().toBlocking().first();
      } else if (isReturnsCompletable(method)) {
        ((Completable) result).await();
        return null;
      } else {
        return result;
      }
    } catch (IllegalAccessException e) {
      // shouldn't happen as method is public due to being an interface
      throw new AssertionError(e);
    } catch (InvocationTargetException e) {
      // Exceptions on fallback are tossed by Hystrix
      throw new AssertionError(e.getCause());
    }
  }
};

這裡很簡單,其實就是先獲取到我們自己在FallbackFactory中配置的的降級方法,然後執行降級邏輯。

總結

這一講核心邏輯主要是Hystrix熔斷狀態的變化,主要是CLOSED、OPEN、HALF_OPEN幾種狀態觸發的時間,互相轉變的流程,以及執行降級邏輯的原理。

我們仍然是用一個流程圖來總結一下:

高清大圖連結:
https://www.processon.com/view/link/5e1ee0afe4b0c62462aae684

(點選原文可以直接檢視大圖哦