1. 程式人生 > >AKKA文件(java版)—容錯

AKKA文件(java版)—容錯

原文連結  譯者:小魚

正如角色系統這一章中解釋的一樣,每一個角色都是它孩子的監管者,並且像這樣的角色都會定義錯誤處理監管策略。這個策略在成為角色系統結構的一個完整部分之後是不能被改變的。

錯誤處理實踐

首先,讓我們看一個處理資料儲存錯誤的例子,它是實踐應用中一個典型的錯誤根源。當然它基於真實應用,這個應用的資料儲存有可能是無效的,但我們在這個例子中會用一個最有效的重連方法來實現。
讀下面的原始碼。內嵌的註釋解釋了錯誤處理的不同塊和為什麼要新增它們。強烈的建議去執行這個例子,這樣才能更簡單的去跟蹤這個日誌輸出,來了解執行的時候發生了什麼。

建立一個監管策略

下面的章節解釋了錯誤處理機制和更深入的選擇。
根據示範的目的,讓我們來考慮如下策略:

private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});

@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}

我選擇了一些大家熟知的異常型別,是為了展示在監管和監視章節中描述的錯誤處理指令的應用。首先,它是一個一對一的策略,意味著每一個孩子都是分開處理的(多對一的策略工作非常類似,唯一的不同就是任何決策都會應用到監管者的所有孩子,不僅僅是發生錯誤的那個)。在重啟頻率上會有一些限制,最大是每分鐘重啟10次。-1和Duration.Inf()意味著限制沒有應用,拋開這個可能性,去指定一個絕對的上限或者去讓這個重啟的工作沒有上限。當超出這個限制,孩子角色就被停止。

注意:如果策略在監管角色(而不是一個單獨的類)中描述了,它的決策者可以線上程安全的形勢下訪問角色的所有內部狀態,包括獲得當前發生錯誤的孩子的引用(例如錯誤訊息的getSender)。

預設監管策略
如果定義的策略沒有覆蓋丟擲的異常,Escalate會被使用。當沒有為一個角色定義監管策略,如下的異常會按照預設來處理:
1. ActorInitializationException會停止發生錯誤的子角色
2. ActorKilledException會停止發生錯誤的子角色
3. Exception會重啟發生錯誤的子角色
4. 別的丟擲型別會升級到父角色
如果異常升級到根監管者,會按上述的預設策略處理。

停止監管策略

跟Erlang方式類似的策略是當它們失敗的時候只停止子角色,以及當DeathWatch通知丟失的子角色的時候會對監管者採取正確的動作。

記錄角色失敗的資訊

預設SupervisorStrategy會記錄失敗資訊除非它們被向上升級。建議在更高層次的結構中處理上升的錯誤,並潛在的記錄下來。
在初始化的時候你可以通過設定SupervisorStrategy的loggingEnabled為false來去掉預設的日誌。可以在Decider裡定製日誌。注意如getSender一樣,當SupervisorStrategy在監管角色中描述,當前失敗的子角色引用是有效的。
你可以通重寫logFailure方法在你自己的SupervisorStrategy實現中定製化日誌。

最高層次角色的監管

最高層次角色意味著它們是通過system.actorOf()建立的,並且它們是User Guardian的孩子。在這種情況下沒有特定的規則,守護者僅僅應用配置策略。

測試應用

下面章節展示了實踐中不同指令的效果,wherefor測試啟動是需要的。首先,我們需要一個匹配的監管者。

public class Supervisor extends UntypedActor {

private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});

@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}

public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
}

這個監管者會被用於建立子角色,我們可以實驗一下:

public class Child extends UntypedActor {
int state = 0;

public void onReceive(Object o) throws Exception {
if (o instanceof Exception) {
throw (Exception) o;
} else if (o instanceof Integer) {
state = (Integer) o;
} else if (o.equals("get")) {
getSender().tell(state, getSelf());
} else {
unhandled(o);
}
}
}

測試使用Testing Actor Systems裡介紹的實用工具會比較簡單,TestProbe提供一個Actor Ref用於接收和檢查訊息回覆。

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.collection.immutable.Seq;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.duration.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;

public class FaultHandlingTest {
static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS);

@BeforeClass
public static void start() {
system = ActorSystem.create("test", AkkaSpec.testConf());
}

@AfterClass
public static void cleanup() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}

@Test
public void mustEmploySupervisorStrategy() throws Exception {
// code here
}

}

讓我們建立角色

Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);

第一個測試會演示Resume指令,所以我們嘗試通過把角色的狀態設定成非初始化狀態並讓它失敗:

child.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);

你可以看到值42讓錯誤處理指令存活下來。現在,如果我們把錯誤改成一個更嚴重的NullPointerException異常,則將不會是這種情況:

child.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

發生IllegalArgumentException致命異常的情況下,最終會導致子角色會被監管者中斷:

final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

到目前為止,監管者還沒有完全受到子角色失敗的影響,因為指令集會處理它。萬一丟擲一個異常,監管者會向上丟擲錯誤。

child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

監管者自己是由ActorSystem提供的最高等級的角色監管的,在所有的異常(注意這兩個異常

ActorInitializationException和ActorKilledException)情況下,預設策略是重新啟動。一旦預設的指令重啟去殺死所有的孩子,我們期望我們的窮孩子不要倖存這個錯誤。

這不是所希望的(這依賴於用例),我們需要去用一個不同的監管者重寫這個行為。

public class Supervisor2 extends UntypedActor {

private static SupervisorStrategy strategy = new OneForOneStrategy(10,
Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});

@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}

public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}

@Override
public void preRestart(Throwable cause, Option<Object> msg) {
// do not kill all children, which is the default here
}
}

通過父親,孩子角色倖存向上升級重啟,如最後一段測試程式碼所演示的:

superprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);