1. 程式人生 > >Akka之BackoffSupervisor

Akka之BackoffSupervisor

new can 原因 eve his strategy nfa multipl resp

一、背景

最近在開發一個項目,項目的各模塊之間是使用akka grpc傳輸音頻幀的,並且各模塊中的actor分別都進行了persist。本周在開發過程中遇到了一個bug,就是音頻幀在通行一段時間後,整個系統處於卡死狀態,沒有了反應。剛開始懷疑是akka grpc通信時,流中斷了,或者沒有傳輸過來,可是通過抓包和日誌,發現流的每一幀已經接受到了。最後定位到問題時persist的原因,每次卡死之間都可以發現persit失敗了。我們去看persist方法的源碼可以發現上面的註解如下:

/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn‘t interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the event was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
def persist[A](event: A)(handler: A ? Unit): Unit = {
internalPersist(event)(handler)
}

從註解我們可以發現,我們在使用akka的persist特性時,如果持久化失敗,相應的actor就會被stop掉。因此,如果你繼續往它發消息是沒有任何反應。並且這個註解建議我們使用Backoff來重啟這個Actor。(建議,我們在把actor持久化到本地或者使用redis等插件持久化到數據庫時,最好選擇持久化的方法,不然會使用java持久化,會出現WARN,因為默認的java持久化效率不是很好)。因此,我們來學習一下,這種BackOff重啟的方式。

二、普通的監控和重啟

我們都知道Actor之間是有父子關系的,如果子Actor出現了異常,是會進行上報,並且使用策略來進行相應的處理,其中最常用的策略就是OneForOneStrategy,也就是只針對發生異常的Actor施用策略,策略中定義了對下屬子級發生的各種異常的處理方式。而默認的處理方式是這樣的:

final val defaultDecider: Decider = {
case _: ActorInitializationException ? Stop
case _: ActorKilledException ? Stop
case _: DeathPactException ? Stop
case _: Exception ? Restart
}

但是,我們可以加上自己的一些特殊處理方式,例如

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: MyException => Restart
case t =>
super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
}

三、BackoffSupervisor

但是,如果我們的actor是通過system.actorOf進行啟動的,我們就很難通過上述這個方式去自定義自己的異常處理方式。並且,如果我們想進行個細粒度的控制,例如在actor發生異常以後多久去處理它。這種情況我們就可以使用BackoffSupervisor去實現。

我們可以這樣理解BackoffSupervisor。實際上BackoffSupervisor與定義了supervisorStrategy的Actor有所不同。我們應該把BackoffSupervisor看作是一個一體化的Actor。當然,它的實現方式還是由一對父子Actor組成。監管策略(SupervisorStrategy)是在BackoffSupervisor的內部實現的。從外表上BackoffSupervisor就像是一個Actor,運算邏輯是在子級Actor中定義的,所謂的父級Actor除監管之外沒有任何其它功能,我們甚至沒有地方定義父級Actor的功能,它的唯一功能是轉發收到的信息給子級,是嵌入BackoffSupervisor裏的。所以我們雖然發送消息給BackoffSupervisor,但實際上是在與它的子級交流。下面我們可以通過一個例子來看看怎麽使用:

object InnerChild {
case class TestMessage(msg: String)
class ChildException extends Exception

def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
import InnerChild._
override def receive: Receive = {
case TestMessage(msg) => //模擬子級功能
log.info(s"Child received message: ${msg}")
}
}
object Supervisor {
def props: Props = { //在這裏定義了監管策略和child Actor構建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
case _: InnerChild.ChildException => SupervisorStrategy.Restart
}

val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
.withManualReset
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
decider.orElse(SupervisorStrategy.defaultDecider)
)
)
BackoffSupervisor.props(options)
}
}
//註意:下面是Supervisor的父級,不是InnerChild的父級
object ParentalActor {
case class SendToSupervisor(msg: InnerChild.TestMessage)
case class SendToInnerChild(msg: InnerChild.TestMessage)
case class SendToChildSelection(msg: InnerChild.TestMessage)
def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
import ParentalActor._
//在這裏構建子級Actor supervisor
val supervisor = context.actorOf(Supervisor.props,"supervisor")
supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回當前子級Actor
var innerChild: Option[ActorRef] = None //返回的當前子級ActorRef
val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
override def receive: Receive = {
case BackoffSupervisor.CurrentChild(ref) => //收到子級Actor信息
innerChild = ref
case SendToSupervisor(msg) => supervisor ! msg
case SendToChildSelection(msg) => selectedChild ! msg
case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
}

}
object BackoffSupervisorDemo extends App {
import ParentalActor._
val testSystem = ActorSystem("testSystem")
val parent = testSystem.actorOf(ParentalActor.props,"parent")
Thread.sleep(1000) //wait for BackoffSupervisor.CurrentChild(ref) received
parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
scala.io.StdIn.readLine()
testSystem.terminate()
}

運行結果如下:

[INFO] [10/13/2018 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
[INFO] [10/13/2018 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
[INFO] [10/13/2018 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

從結果可以看出,雖然在上面的例子裏我們分別向supervisor,innerChild,selectedChild發送消息。但最終所有消息都是由InnerChild響應的。

Akka之BackoffSupervisor