Akka源碼分析-CircuitBreaker(熔斷器)
熔斷器,在很多技術棧中都會出現的一種技術。它是在分布式系統中提供一個穩定的阻止嵌套失敗的機制。
該怎麽理解呢?簡單來說,在分布式環境中,如果某個計算節點出現問題,很容易出現失敗的逆向傳到或整個系統的雪崩。什麽意思呢?比如某個服務按照順序依次調用了其他的三個服務,分別為A/B/C。如果B服務由於某種原因,響應變慢了,本來100毫秒就完成了,現在是1秒。此時A就會等待B服務的時間也就變成了1秒,那麽就意味著會有很多的A服務調用在等待,如果並發量非常大,很容易就會造成A服務所在的節點出現問題,也就是說,B的問題傳遞給了A。熔斷器就是用來解決這個問題的。
A服務和B服務之間有個熔斷器,A通過熔斷器調用B服務,熔斷器會根據某種算法判斷B服務是否正常,如果B不正常,則A調用的時候會立即失敗,而不用再等待1秒的時間,同時也不會去調用B服務。這樣節約了A服務判斷失敗的時間,也減少了B服務的壓力。等B服務正常的時候,A就可以正常調用了。
關於熔斷器的原理和使用,這裏不再啰嗦,讀者可參考引用的第一遍博客。我這裏只分析在akka中如何實現熔斷器。
按照慣例,我們還是從akka的官方demo入手分析。
class DangerousActor extends Actor with ActorLogging { import context.dispatcher val breaker = new CircuitBreaker( context.system.scheduler, maxFailures = 5, callTimeout = 10.seconds, resetTimeout = 1.minute).onOpen(notifyMeOnOpen()) def notifyMeOnOpen(): Unit = log.warning("My CircuitBreaker is now open, and will not close for one minute") //#circuit-breaker-initialization //#circuit-breaker-usage def dangerousCall: String = "This really isn‘t that dangerous of a call after all" def receive = { case "is my middle name" ? breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender() case "block for me" ? sender() ! breaker.withSyncCircuitBreaker(dangerousCall) } //#circuit-breaker-usage
首先來看CircuitBreaker的創建過程,它有四個參數,我們著重分析後面三個:最大失敗次數、調用超時時間、重置的超時時間。
/** * Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to * remote systems * * Transitions through three states: * - In *Closed* state, calls pass through until the `maxFailures` count is reached. This causes the circuit breaker * to open. Both exceptions and calls exceeding `callTimeout` are considered failures. * - In *Open* state, calls fail-fast with an exception. After `resetTimeout`, circuit breaker transitions to * half-open state. * - In *Half-Open* state, the first call will be allowed through, if it succeeds the circuit breaker will reset to * closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that * execute while the first is running will fail-fast with an exception. * * @param scheduler Reference to Akka scheduler * @param maxFailures Maximum number of failures before opening the circuit * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners */ class CircuitBreaker( scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, val resetTimeout: FiniteDuration, maxResetTimeout: FiniteDuration, exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker
使用就比較簡單了,就是把要執行的代碼傳給withCircuitBreaker或withSyncCircuitBreaker。
/** * Wraps invocations of asynchronous calls that need to be protected * * @param body Call needing protected * @return [[scala.concurrent.Future]] containing the call result or a * `scala.concurrent.TimeoutException` if the call timed out * */ def withCircuitBreaker[T](body: ? Future[T]): Future[T] = currentState.invoke(body, CircuitBreaker.exceptionAsFailure)
withSyncCircuitBreaker的參數是一個body,也就是函數的傳名調用,你可以把它理解成一個函數指針吧。代碼很簡單,就是調用了currentState.invoke。那currentState是什麽呢?還記得熔斷器的三種狀態麽?開放、關閉、半開。很顯然第一次調用的時候,應該是關閉狀態,body的代碼會被正常執行。
/** * Helper method for accessing underlying state via Unsafe * * @return Reference to current state */ @inline private[this] def currentState: State = Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State]
currentState的類型是State。
State有三種實現:Closed、HalfOpen、Open。
/** * Implementation of invoke, which simply attempts the call * * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = callThrough(body, defineFailureFn)
先來看Closed的invoke實現,很簡單,調用callThrough,從名字來看應該就是直接調用body這個函數。
/** * Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed * call timeout is counted as a failed call, otherwise a successful call * * @param body Implementation of the call * @param defineFailureFn function that define what should be consider failure and thus increase failure count * @return Future containing the result of the call */ def callThrough[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = { def materialize[U](value: ? Future[U]): Future[U] = try value catch { case NonFatal(t) ? Future.failed(t) } if (callTimeout == Duration.Zero) { val start = System.nanoTime() val f = materialize(body) f.onComplete { case s: Success[_] ? notifyCallSuccessListeners(start) callSucceeds() case Failure(ex) ? notifyCallFailureListeners(start) callFails() } f } else { val start = System.nanoTime() val p = Promise[T]() implicit val ec = sameThreadExecutionContext p.future.onComplete { fResult ? if (defineFailureFn(fResult)) { callFails() } else { notifyCallSuccessListeners(start) callSucceeds() } } val timeout = scheduler.scheduleOnce(callTimeout) { if (p tryFailure timeoutEx) { notifyCallTimeoutListeners(start) } } materialize(body).onComplete { case Success(result) ? p.trySuccess(result) timeout.cancel case Failure(ex) ? if (p.tryFailure(ex)) { notifyCallFailureListeners(start) } timeout.cancel } p.future } }
callThrough分兩種情況,一種是body有超時時間,另一種是沒有超時時間(會一直等待body調用成功)。我們只分析設置了超時時間的情況,跟沒有超時時間的實現也差不多,無非是多了一個promise。
首先創建了一個Promise,然後設置了Promise的失敗、成功的處理邏輯。失敗調用callFails,成功調用callSucceeds。
materialize這個函數比較簡單,就是直接調用body方法,處理了非致命異常。body調用成功,則設置promise狀態為成功;否則設置Promise為失敗。那如果body一直執行,超時了呢?
那就是scheduler.scheduleOnce這段代碼發揮作用的時候了,它起了一個timer,到達時間後設置Promise為失敗,然後調用notifyCallTimeoutListeners。但一定要註意,即使調用超時,body也會一直等待返回結果直至退出的。這跟future的機制有關。
簡單起見,我們首先來看執行成功且沒有超時的情況。根據源碼邏輯應該執行notifyCallSuccessListeners(start)、 callSucceeds()。notifyCallTimeoutListeners不再分析,這是用來回調開發者定義的函數的。
/** * On successful call, the failure count is reset to 0 * * @return */ override def callSucceeds(): Unit = set(0)
Closed的callSucceeds實現的是不是有點簡單了,就是調用了set。set是幹啥的呢?來看看Closed定義。
private object Closed extends AtomicInteger with State
Closed不僅是一個State,還是一個AtomicInteger。簡單來說它還是一個線程安全的計數器。當調用成功時,就會把當前的值設置成0,那如果失敗呢,應該是++吧。
/** * On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and * the breaker is tripped if we have reached maxFailures. * * @return */ override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed)
來看看callFails,它就是把當前技術加1,然後判斷是否到達最大失敗次數,如果到達,則調用tripBreaker。
/** * Trips breaker to an open state. This is valid from Closed or Half-Open states. * * @param fromState State we‘re coming from (Closed or Half-Open) */ private def tripBreaker(fromState: State): Unit = transition(fromState, Open)
/** * Implements consistent transition between states. Throws IllegalStateException if an invalid transition is attempted. * * @param fromState State being transitioning from * @param toState State being transitioning from */ private def transition(fromState: State, toState: State): Unit = { if (swapState(fromState, toState)) toState.enter() // else some other thread already swapped state }
其實就是到達最大失敗次數後,轉換當前狀態,到達Open,然後調用enter方法。
/** * On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to * calculate remaining time before attempted reset. * * @return */ override def _enter(): Unit = { set(System.nanoTime()) scheduler.scheduleOnce(currentResetTimeout) { attemptReset() } val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match { case f: FiniteDuration ? f case _ ? currentResetTimeout } if (nextResetTimeout < maxResetTimeout) swapResetTimeout(currentResetTimeout, nextResetTimeout) }
上面是Open的enter實現,上來就調用set。set是啥?
private object Open extends AtomicLong with State
從定義來看,set是AtomicLong裏面的方法。其實就是設置進入Open狀態的時間。然後啟動一個timer,設置重置的時間。那如果達到指定時間,attemptReset做了什麽呢?
/** * Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only. * */ private def attemptReset(): Unit = transition(Open, HalfOpen)
額,其實就是到達了半開狀態。我們來看看如果在開放狀態,失敗次數超過了指定次數,到達Open時,invoke的邏輯是怎樣的。
/** * Fail-fast on any invocation * * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = { notifyCallBreakerOpenListeners() Future.failed(new CircuitBreakerOpenException(remainingDuration())) }
invoke邏輯有點簡單啊,就是通知了listener,然後直接設置返回值為失敗了。根本沒有調用body的代碼。也就是說在Open狀態,所有的調用都會直接失敗,一直到達重置時間轉入HalfOpen狀態未止。
/** * On entry, guard should be reset for that first call to get in * * @return */ override def _enter(): Unit = set(true)
進入半開狀態邏輯還是調用set,只不過這裏的值是true,那想必HalfOpen還是一個AtomicBoolean了。
/** * Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. * If the call succeeds the breaker closes. * * @param body Implementation of the call that needs protected * @return Future containing result of protected call */ override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = if (compareAndSet(true, false)) callThrough(body, defineFailureFn) else { notifyCallBreakerOpenListeners() Future.failed[T](new CircuitBreakerOpenException(0.seconds)) }
那我們來看看invoke。它首先判斷當前值是不是true,如果是true則賦值為false,很顯然只有在第一次調用的時候改值才是true。也就是說從Open狀態到HalfOpen狀態時,只有第一次invoke會執行callThrough,也就是會執行body,如果是第二次則會直接失敗。
我們知道callThrough在body執行成功時調用callSucceeds,失敗時調用callFails。我們來看看HalfOpen這兩個函數的實現。
/** * Reset breaker on successful call. * * @return */ override def callSucceeds(): Unit = resetBreaker() /** * Reopen breaker on failed call. * * @return */ override def callFails(): Unit = tripBreaker(HalfOpen)
/** * Resets breaker to a closed state. This is valid from an Half-Open state only. * */ private def resetBreaker(): Unit = transition(HalfOpen, Closed)
首先來看成功的情況,根據源碼調用邏輯,它就是把狀態從HalfOpen轉換到了Closed狀態。Closed狀態我們已經分析過了。那失敗如何處理呢?
/** * Trips breaker to an open state. This is valid from Closed or Half-Open states. * * @param fromState State we‘re coming from (Closed or Half-Open) */ private def tripBreaker(fromState: State): Unit = transition(fromState, Open)
失敗時又一次轉到了Open狀態,這個狀態也分析過了。
至此akka的熔斷器邏輯分析完畢。有沒有非常簡單?簡單來說就是,剛開始熔斷器處於Closed狀態,用戶的邏輯會正常執行,如果失敗次數和超時次數超過指定次數,就會進入Open狀態;Open狀態不會執行用戶邏輯,會直接失敗,等到指定時間後,進入HalfOpen狀態;HalfOpen狀態第一次調用成功,就進入Closed狀態,如果調用失敗則重新進入Open狀態,再次等待指定時間。
其實官網的一個圖就可以解釋清楚了。有人問withSyncCircuitBreaker跟withCircuitBreaker有啥區別,其實吧,區別不大,一個是同步的,一個是異步的。異步是通過Await.result來轉化成同步的。
/** * Wraps invocations of synchronous calls that need to be protected * * Calls are run in caller‘s thread. Because of the synchronous nature of * this call the `scala.concurrent.TimeoutException` will only be thrown * after the body has completed. * * Throws java.util.concurrent.TimeoutException if the call timed out. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count * @return The result of the call */ def withSyncCircuitBreaker[T](body: ? T, defineFailureFn: Try[T] ? Boolean): T = Await.result( withCircuitBreaker( try Future.successful(body) catch { case NonFatal(t) ? Future.failed(t) }, defineFailureFn), callTimeout)
上面代碼可以從根本上解釋兩者的區別,我就不再過多分析了。
防雪崩利器:熔斷器 Hystrix 的原理與使用
Akka源碼分析-CircuitBreaker(熔斷器)