Akka工具(二)—Future
Future被Akka設計用來處理併發執行結果的資料結構,我們可以通過同步(阻塞)或非同步(非阻塞)的方法接受結果。使用Future,我們可以在Actor外部獲取某個Actor的訊息,在介紹Actor訊息傳送方式時,我們進行過簡單介紹,現在我們先來回憶一下Future的常規用法。
Future訊息處理
ask傳送訊息會返回一個Future物件,通過該物件我們可以同步或非同步方式處理Actor返回的結果訊息。同步方式主要依賴Await類的方法阻塞等待返回值,非同步方式則依靠Future物件提供的諸多回調方法。使用ask方法時,我們最好設定一個超時時間,否則一直阻塞嚴重影響系統的吞吐率和效能。
同步方式程式碼示例:
public class FutureActor extends AbstractActor { public static void main(String[] args) { ActorSystem system = ActorSystem.create("system"); ActorRef futureActor = system.actorOf(Props.create(FutureActor.class), "futureActor"); Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS)); Future<Object> future = Patterns.ask(futureActor, "wait access message", timeout); try { String reply = (String) Await.result(future, timeout.duration()); System.out.println("回覆的訊息: " + reply); } catch (Exception e) { e.printStackTrace(); } } @Override public Receive createReceive() { return receiveBuilder().matchAny(other -> { System.out.println("接受的訊息:" + other); //模擬超時操作 // Thread.sleep(3000); getSender().tell("access", getSelf()); }).build(); } }
我們首先定義Timeout用於限制超時,然後使用Patterns.ask給futureActor傳送非同步請求,返回一個Future物件,該物件封裝了futureActor返回的訊息,之前我們說過處理訊息有兩種方式:同步和非同步,程式碼中採用Await.result(同步方式),表明我們會在獲取訊息上阻塞,直到訊息返回才會繼續往下執行。這裡我們使用sleep方法模擬服務超時,程式會丟擲TimeoutException。
java.util.concurrent.TimeoutException: Futures timed out after [2 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at scala.concurrent.Await.result(package.scala) at com.release.util.akka.Future.FutureActor.main(FutureActor.java:26)
在實際專案中,如果ask請求結果不影響程式繼續執行,為了儘可能提高程式響應速度,我們或許更喜歡非同步方式處理訊息,Future類提供了諸多回調函式,例如:onComplete(處理執行完成)、onSuccess(處理執行成功)、onFailure(處理執行失敗),修改上述示例:
future.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable throwable, Object o) throws Throwable {
if (throwable != null) {
System.out.println("返回結果異常:" + throwable.getMessage());
} else {
System.out.println("返回訊息:" + o);
}
}
}, system.dispatcher());
// 成功,執行過程
future.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
System.out.println("回覆的訊息:" + msg);
}
}, system.dispatcher());
//失敗,執行過程
future.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable throwable) throws Throwable {
if (throwable instanceof TimeoutException) {
System.out.println("服務超時");
} else {
System.out.println("未知錯誤");
}
}
}, system.dispatcher());
}
當futureActor及時返回訊息,onSuccess方法會執行並接受返回的訊息;當程式執行超時或其它錯誤,會呼叫onFailure方法並得到異常;大家執行可以發現,無論成功與否,onComplete方法都會呼叫,它相當於onSuccess和onFailure方法整合。
函式式Future
相信使用過Jdk1.8或以上的朋友,都知道函數語言程式設計的好處,呼叫清晰、處理簡單,方便我們對各類資料進行分析和調整。在這裡Future也提供了很多好用的函式式API,方便我們對資料處理。
map函式
當我們需要對返回的資料進行處理,並需要返回新的結果時,可以使用map函式。對資料進行處理後,map函式會返回一個新的Future,不影響之前Future的值,是一個全新的Future。例如:
//對actor返回的訊息進行處理,返回新的future
Future<String> map = future.map(new Mapper<Object, String>() {
@Override
public String apply(Object msg) {
return "獲得futureActor的結果:" + msg;
}
}, system.dispatcher());
try {
String reply = Await.result(map, timeout.duration());
System.out.println(reply);
} catch (Exception e) {
e.printStackTrace();
}
使用map函式,需要new一個Mapper物件,該物件需要兩個引數,第一個代表輸入型別,第二個代表處理後輸出的型別,與apply方法的入參和返回值相對應。
map函式不僅可以作用於一個Future物件,map函式還可以作用於多個Future物件,當我們獲取到一系列Future之後,我們可以把它們組合成一個列表,利用map函式進行計算。例如:統計一個班級上姓李的同學數量。
示例:
public class MapActor extends AbstractActor {
public static void main(String[] args) {
String[] names = {"張三", "李四", "王五", "李春花", "李貴妃"};
ActorSystem system = ActorSystem.create();
Timeout timeout = new Timeout(Duration.create(3, TimeUnit.SECONDS));
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < names.length; i++) {
ActorRef ref = system.actorOf(Props.create(MapActor.class), "mapActor" + i);
Future future = Patterns.ask(ref, names[i], timeout);
futures.add(future);
}
//把Future列表轉換為帶有結果列表的Future物件
Future<Iterable<Integer>> iterableFuture = Futures.sequence(futures, system.dispatcher());
Future<Integer> map = iterableFuture.map(new Mapper<Iterable<Integer>, Integer>() {
@Override
public Integer apply(Iterable<Integer> parameter) {
Integer count = 0;
for (Integer o : parameter) {
count += o;
}
return count;
}
}, system.dispatcher());
try {
Integer reply = Await.result(map, timeout.duration());
System.out.println("李姓數量: " + reply);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, s -> getSender().tell(s.startsWith("李") ? 1 : 0, getSelf())).build();
}
}
上述過程中,我們先使用sequence讓List集合轉換為帶有結果列表的Future物件,因為List集合是不能直接使用map方法的,畢竟map方法不屬於List,之後使用map方法遍歷結果計算,最終返回一個帶有我們所需結果的新Future物件。
fold函式
我們除了使用map函式統計之外,還可以使用fold函式,fold與map函式有所不同,fold函式提供了一個初始值,當我們傳一個帶有Future的列表時,fold會先使用初始值和列表的第一個值計算,然後把計算的值作為下一次的初始值,以此類推。修改上述程式碼:
Future<Integer> fold = Futures.fold(0, futures, new Function2<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer v1, Integer v2) throws Exception {
System.out.println(v1+"--->"+v2);
return v1 + v2;
}
}, system.dispatcher());
輸出結果:
0--->0
0--->1
1--->0
1--->1
2--->1
李姓數量: 3
當然這裡完全可以不需要初始值,所以我們也可以使用reduce函式,使用方式和fold類似,如下:
Future<Integer> fold = Futures.reduce( futures, new Function2<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer v1, Integer v2) throws Exception {
System.out.println(v1+"--->"+v2);
return v1 + v2;
}
}, system.dispatcher());
總結
使用Future可以讓我們在Actor外部獲取結果,並且可以選擇獲取結果的方式:非同步或者同步,儘可能的利用非同步方式獲取結果,可以提高系統的吞吐率和響應速度。另外,使用Future,我們還可以使用它所提供的諸多函式式API,方便我們對資料進行處理或計算。