1. 程式人生 > >Akka源碼分析-CircuitBreaker(熔斷器)

Akka源碼分析-CircuitBreaker(熔斷器)

ive 什麽 ESS asi failure 我們 環境 break syn

  熔斷器,在很多技術棧中都會出現的一種技術。它是在分布式系統中提供一個穩定的阻止嵌套失敗的機制。

  該怎麽理解呢?簡單來說,在分布式環境中,如果某個計算節點出現問題,很容易出現失敗的逆向傳到或整個系統的雪崩。什麽意思呢?比如某個服務按照順序依次調用了其他的三個服務,分別為A/B/C。如果B服務由於某種原因,響應變慢了,本來100毫秒就完成了,現在是1秒。此時A就會等待B服務的時間也就變成了1秒,那麽就意味著會有很多的A服務調用在等待,如果並發量非常大,很容易就會造成A服務所在的節點出現問題,也就是說,B的問題傳遞給了A。熔斷器就是用來解決這個問題的。

  A服務和B服務之間有個熔斷器,A通過熔斷器調用B服務,熔斷器會根據某種算法判斷B服務是否正常,如果B不正常,則A調用的時候會立即失敗,而不用再等待1秒的時間,同時也不會去調用B服務。這樣節約了A服務判斷失敗的時間,也減少了B服務的壓力。等B服務正常的時候,A就可以正常調用了。

  關於熔斷器的原理和使用,這裏不再啰嗦,讀者可參考引用的第一遍博客。我這裏只分析在akka中如何實現熔斷器。

  按照慣例,我們還是從akka的官方demo入手分析。

class DangerousActor extends Actor with ActorLogging {
  import context.dispatcher

  val breaker =
    new CircuitBreaker(
      context.system.scheduler,
      maxFailures = 5,
      callTimeout = 10.seconds,
      resetTimeout = 1.minute).onOpen(notifyMeOnOpen())

  def notifyMeOnOpen(): Unit =
    log.warning("My CircuitBreaker is now open, and will not close for one minute")
  //#circuit-breaker-initialization

  //#circuit-breaker-usage
  def dangerousCall: String = "This really isn‘t that dangerous of a call after all"

  def receive = {
    case "is my middle name" ?
      breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender()
    case "block for me" ?
      sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
  }
  //#circuit-breaker-usage

  首先來看CircuitBreaker的創建過程,它有四個參數,我們著重分析後面三個:最大失敗次數、調用超時時間、重置的超時時間。

/**
 * Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to
 * remote systems
 *
 * Transitions through three states:
 * - In *Closed* state, calls pass through until the `maxFailures` count is reached.  This causes the circuit breaker
 * to open.  Both exceptions and calls exceeding `callTimeout` are considered failures.
 * - In *Open* state, calls fail-fast with an exception.  After `resetTimeout`, circuit breaker transitions to
 * half-open state.
 * - In *Half-Open* state, the first call will be allowed through, if it succeeds the circuit breaker will reset to
 * closed state.  If it fails, the circuit breaker will re-open to open state.  All calls beyond the first that
 * execute while the first is running will fail-fast with an exception.
 *
 * @param scheduler Reference to Akka scheduler
 * @param maxFailures Maximum number of failures before opening the circuit
 * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
 * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
 * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
 */
class CircuitBreaker(
  scheduler:                Scheduler,
  maxFailures:              Int,
  callTimeout:              FiniteDuration,
  val resetTimeout:         FiniteDuration,
  maxResetTimeout:          FiniteDuration,
  exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker

  使用就比較簡單了,就是把要執行的代碼傳給withCircuitBreaker或withSyncCircuitBreaker。

  /**
   * Wraps invocations of asynchronous calls that need to be protected
   *
   * @param body Call needing protected
   * @return [[scala.concurrent.Future]] containing the call result or a
   *   `scala.concurrent.TimeoutException` if the call timed out
   *
   */
  def withCircuitBreaker[T](body: ? Future[T]): Future[T] = currentState.invoke(body, CircuitBreaker.exceptionAsFailure)

  withSyncCircuitBreaker的參數是一個body,也就是函數的傳名調用,你可以把它理解成一個函數指針吧。代碼很簡單,就是調用了currentState.invoke。那currentState是什麽呢?還記得熔斷器的三種狀態麽?開放、關閉、半開。很顯然第一次調用的時候,應該是關閉狀態,body的代碼會被正常執行。

  /**
   * Helper method for accessing underlying state via Unsafe
   *
   * @return Reference to current state
   */
  @inline
  private[this] def currentState: State =
    Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State]

  currentState的類型是State。

技術分享圖片

  State有三種實現:Closed、HalfOpen、Open。

   /**
     * Implementation of invoke, which simply attempts the call
     *
     * @param body Implementation of the call that needs protected
     * @return Future containing result of protected call
     */
    override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] =
      callThrough(body, defineFailureFn)

 先來看Closed的invoke實現,很簡單,調用callThrough,從名字來看應該就是直接調用body這個函數。

/**
     * Shared implementation of call across all states.  Thrown exception or execution of the call beyond the allowed
     * call timeout is counted as a failed call, otherwise a successful call
     *
     * @param body Implementation of the call
     * @param defineFailureFn function that define what should be consider failure and thus increase failure count
     * @return Future containing the result of the call
     */
    def callThrough[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = {

      def materialize[U](value: ? Future[U]): Future[U] = try value catch { case NonFatal(t) ? Future.failed(t) }

      if (callTimeout == Duration.Zero) {
        val start = System.nanoTime()
        val f = materialize(body)

        f.onComplete {
          case s: Success[_] ?
            notifyCallSuccessListeners(start)
            callSucceeds()
          case Failure(ex) ?
            notifyCallFailureListeners(start)
            callFails()
        }

        f
      } else {
        val start = System.nanoTime()
        val p = Promise[T]()

        implicit val ec = sameThreadExecutionContext

        p.future.onComplete { fResult ?
          if (defineFailureFn(fResult)) {
            callFails()
          } else {
            notifyCallSuccessListeners(start)
            callSucceeds()
          }
        }

        val timeout = scheduler.scheduleOnce(callTimeout) {
          if (p tryFailure timeoutEx) {
            notifyCallTimeoutListeners(start)
          }
        }

        materialize(body).onComplete {
          case Success(result) ?
            p.trySuccess(result)
            timeout.cancel
          case Failure(ex) ?
            if (p.tryFailure(ex)) {
              notifyCallFailureListeners(start)
            }
            timeout.cancel
        }
        p.future
      }
    }

  callThrough分兩種情況,一種是body有超時時間,另一種是沒有超時時間(會一直等待body調用成功)。我們只分析設置了超時時間的情況,跟沒有超時時間的實現也差不多,無非是多了一個promise。

  首先創建了一個Promise,然後設置了Promise的失敗、成功的處理邏輯。失敗調用callFails,成功調用callSucceeds。

  materialize這個函數比較簡單,就是直接調用body方法,處理了非致命異常。body調用成功,則設置promise狀態為成功;否則設置Promise為失敗。那如果body一直執行,超時了呢?

  那就是scheduler.scheduleOnce這段代碼發揮作用的時候了,它起了一個timer,到達時間後設置Promise為失敗,然後調用notifyCallTimeoutListeners。但一定要註意,即使調用超時,body也會一直等待返回結果直至退出的。這跟future的機制有關。

  簡單起見,我們首先來看執行成功且沒有超時的情況。根據源碼邏輯應該執行notifyCallSuccessListeners(start)、 callSucceeds()。notifyCallTimeoutListeners不再分析,這是用來回調開發者定義的函數的。

 /**
     * On successful call, the failure count is reset to 0
     *
     * @return
     */
    override def callSucceeds(): Unit = set(0)

  Closed的callSucceeds實現的是不是有點簡單了,就是調用了set。set是幹啥的呢?來看看Closed定義。

private object Closed extends AtomicInteger with State

  Closed不僅是一個State,還是一個AtomicInteger。簡單來說它還是一個線程安全的計數器。當調用成功時,就會把當前的值設置成0,那如果失敗呢,應該是++吧。

 /**
     * On failed call, the failure count is incremented.  The count is checked against the configured maxFailures, and
     * the breaker is tripped if we have reached maxFailures.
     *
     * @return
     */
    override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed)

  來看看callFails,它就是把當前技術加1,然後判斷是否到達最大失敗次數,如果到達,則調用tripBreaker。

/**
   * Trips breaker to an open state.  This is valid from Closed or Half-Open states.
   *
   * @param fromState State we‘re coming from (Closed or Half-Open)
   */
  private def tripBreaker(fromState: State): Unit = transition(fromState, Open)

/**
   * Implements consistent transition between states. Throws IllegalStateException if an invalid transition is attempted.
   *
   * @param fromState State being transitioning from
   * @param toState State being transitioning from
   */
  private def transition(fromState: State, toState: State): Unit = {
    if (swapState(fromState, toState))
      toState.enter()
    // else some other thread already swapped state
  }

  其實就是到達最大失敗次數後,轉換當前狀態,到達Open,然後調用enter方法。

 /**
     * On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to
     * calculate remaining time before attempted reset.
     *
     * @return
     */
    override def _enter(): Unit = {
      set(System.nanoTime())
      scheduler.scheduleOnce(currentResetTimeout) {
        attemptReset()
      }
      val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match {
        case f: FiniteDuration ? f
        case _                 ? currentResetTimeout
      }

      if (nextResetTimeout < maxResetTimeout)
        swapResetTimeout(currentResetTimeout, nextResetTimeout)
    }

  上面是Open的enter實現,上來就調用set。set是啥?

 private object Open extends AtomicLong with State 

  從定義來看,set是AtomicLong裏面的方法。其實就是設置進入Open狀態的時間。然後啟動一個timer,設置重置的時間。那如果達到指定時間,attemptReset做了什麽呢?

/**
   * Attempts to reset breaker by transitioning to a half-open state.  This is valid from an Open state only.
   *
   */
  private def attemptReset(): Unit = transition(Open, HalfOpen)

  額,其實就是到達了半開狀態。我們來看看如果在開放狀態,失敗次數超過了指定次數,到達Open時,invoke的邏輯是怎樣的。

/**
     * Fail-fast on any invocation
     *
     * @param body Implementation of the call that needs protected
     * @return Future containing result of protected call
     */
    override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = {
      notifyCallBreakerOpenListeners()
      Future.failed(new CircuitBreakerOpenException(remainingDuration()))
    }

  invoke邏輯有點簡單啊,就是通知了listener,然後直接設置返回值為失敗了。根本沒有調用body的代碼。也就是說在Open狀態,所有的調用都會直接失敗,一直到達重置時間轉入HalfOpen狀態未止。

/**
     * On entry, guard should be reset for that first call to get in
     *
     * @return
     */
    override def _enter(): Unit = set(true)

  進入半開狀態邏輯還是調用set,只不過這裏的值是true,那想必HalfOpen還是一個AtomicBoolean了。

/**
     * Allows a single call through, during which all other callers fail-fast.  If the call fails, the breaker reopens.
     * If the call succeeds the breaker closes.
     *
     * @param body Implementation of the call that needs protected
     * @return Future containing result of protected call
     */
    override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] =
      if (compareAndSet(true, false))
        callThrough(body, defineFailureFn)
      else {
        notifyCallBreakerOpenListeners()
        Future.failed[T](new CircuitBreakerOpenException(0.seconds))
      }

  那我們來看看invoke。它首先判斷當前值是不是true,如果是true則賦值為false,很顯然只有在第一次調用的時候改值才是true。也就是說從Open狀態到HalfOpen狀態時,只有第一次invoke會執行callThrough,也就是會執行body,如果是第二次則會直接失敗。

  我們知道callThrough在body執行成功時調用callSucceeds,失敗時調用callFails。我們來看看HalfOpen這兩個函數的實現。

    /**
     * Reset breaker on successful call.
     *
     * @return
     */
    override def callSucceeds(): Unit = resetBreaker()

    /**
     * Reopen breaker on failed call.
     *
     * @return
     */
    override def callFails(): Unit = tripBreaker(HalfOpen)

  /**
   * Resets breaker to a closed state.  This is valid from an Half-Open state only.
   *
   */
  private def resetBreaker(): Unit = transition(HalfOpen, Closed)

  首先來看成功的情況,根據源碼調用邏輯,它就是把狀態從HalfOpen轉換到了Closed狀態。Closed狀態我們已經分析過了。那失敗如何處理呢?

  /**
   * Trips breaker to an open state.  This is valid from Closed or Half-Open states.
   *
   * @param fromState State we‘re coming from (Closed or Half-Open)
   */
  private def tripBreaker(fromState: State): Unit = transition(fromState, Open)

  失敗時又一次轉到了Open狀態,這個狀態也分析過了。

  至此akka的熔斷器邏輯分析完畢。有沒有非常簡單?簡單來說就是,剛開始熔斷器處於Closed狀態,用戶的邏輯會正常執行,如果失敗次數和超時次數超過指定次數,就會進入Open狀態;Open狀態不會執行用戶邏輯,會直接失敗,等到指定時間後,進入HalfOpen狀態;HalfOpen狀態第一次調用成功,就進入Closed狀態,如果調用失敗則重新進入Open狀態,再次等待指定時間。

技術分享圖片

  其實官網的一個圖就可以解釋清楚了。有人問withSyncCircuitBreaker跟withCircuitBreaker有啥區別,其實吧,區別不大,一個是同步的,一個是異步的。異步是通過Await.result來轉化成同步的。

/**
   * Wraps invocations of synchronous calls that need to be protected
   *
   * Calls are run in caller‘s thread. Because of the synchronous nature of
   * this call the  `scala.concurrent.TimeoutException` will only be thrown
   * after the body has completed.
   *
   * Throws java.util.concurrent.TimeoutException if the call timed out.
   *
   * @param body Call needing protected
   * @param defineFailureFn function that define what should be consider failure and thus increase failure count
   * @return The result of the call
   */
  def withSyncCircuitBreaker[T](body: ? T, defineFailureFn: Try[T] ? Boolean): T =
    Await.result(
      withCircuitBreaker(
        try Future.successful(body) catch { case NonFatal(t) ? Future.failed(t) },
        defineFailureFn),
      callTimeout)

  上面代碼可以從根本上解釋兩者的區別,我就不再過多分析了。

防雪崩利器:熔斷器 Hystrix 的原理與使用

Akka源碼分析-CircuitBreaker(熔斷器)