1. 程式人生 > >Akka工具(二)—Future

Akka工具(二)—Future

       Future被Akka設計用來處理併發執行結果的資料結構,我們可以通過同步(阻塞)或非同步(非阻塞)的方法接受結果。使用Future,我們可以在Actor外部獲取某個Actor的訊息,在介紹Actor訊息傳送方式時,我們進行過簡單介紹,現在我們先來回憶一下Future的常規用法。

    Future訊息處理

       ask傳送訊息會返回一個Future物件,通過該物件我們可以同步或非同步方式處理Actor返回的結果訊息。同步方式主要依賴Await類的方法阻塞等待返回值,非同步方式則依靠Future物件提供的諸多回調方法。使用ask方法時,我們最好設定一個超時時間,否則一直阻塞嚴重影響系統的吞吐率和效能。

同步方式程式碼示例:

public class FutureActor extends AbstractActor {

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("system");
        ActorRef futureActor = system.actorOf(Props.create(FutureActor.class), "futureActor");
        Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS));
        Future<Object> future = Patterns.ask(futureActor, "wait access message", timeout);
        try {
            String reply = (String) Await.result(future, timeout.duration());
            System.out.println("回覆的訊息: " + reply);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().matchAny(other -> {
            System.out.println("接受的訊息:" + other);
            //模擬超時操作
            // Thread.sleep(3000);
            getSender().tell("access", getSelf());
        }).build();
    }
}

       我們首先定義Timeout用於限制超時,然後使用Patterns.ask給futureActor傳送非同步請求,返回一個Future物件,該物件封裝了futureActor返回的訊息,之前我們說過處理訊息有兩種方式:同步和非同步,程式碼中採用Await.result(同步方式),表明我們會在獲取訊息上阻塞,直到訊息返回才會繼續往下執行。這裡我們使用sleep方法模擬服務超時,程式會丟擲TimeoutException。

java.util.concurrent.TimeoutException: Futures timed out after [2 seconds]
    
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)

    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)

    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

    at scala.concurrent.Await$.result(package.scala:190)

    at scala.concurrent.Await.result(package.scala)

    at com.release.util.akka.Future.FutureActor.main(FutureActor.java:26)

       在實際專案中,如果ask請求結果不影響程式繼續執行,為了儘可能提高程式響應速度,我們或許更喜歡非同步方式處理訊息,Future類提供了諸多回調函式,例如:onComplete(處理執行完成)、onSuccess(處理執行成功)、onFailure(處理執行失敗),修改上述示例:

     future.onComplete(new OnComplete<Object>() {
            @Override
            public void onComplete(Throwable throwable, Object o) throws Throwable {
                if (throwable != null) {
                    System.out.println("返回結果異常:" + throwable.getMessage());
                } else {
                    System.out.println("返回訊息:" + o);
                }
            }
        }, system.dispatcher());
        // 成功,執行過程
        future.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object msg) throws Throwable {
                System.out.println("回覆的訊息:" + msg);
            }
        }, system.dispatcher());
        //失敗,執行過程
        future.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable throwable) throws Throwable {
                if (throwable instanceof TimeoutException) {
                    System.out.println("服務超時");
                } else {
                    System.out.println("未知錯誤");
                }
            }
        }, system.dispatcher());

    }

        當futureActor及時返回訊息,onSuccess方法會執行並接受返回的訊息;當程式執行超時或其它錯誤,會呼叫onFailure方法並得到異常;大家執行可以發現,無論成功與否,onComplete方法都會呼叫,它相當於onSuccess和onFailure方法整合。

     函式式Future

        相信使用過Jdk1.8或以上的朋友,都知道函數語言程式設計的好處,呼叫清晰、處理簡單,方便我們對各類資料進行分析和調整。在這裡Future也提供了很多好用的函式式API,方便我們對資料處理。

     map函式

       當我們需要對返回的資料進行處理,並需要返回新的結果時,可以使用map函式。對資料進行處理後,map函式會返回一個新的Future,不影響之前Future的值,是一個全新的Future。例如:

//對actor返回的訊息進行處理,返回新的future
Future<String> map = future.map(new Mapper<Object, String>() {
    @Override
    public String apply(Object msg) {
        return "獲得futureActor的結果:" + msg;
    }
}, system.dispatcher());
try {
    String reply = Await.result(map, timeout.duration());
    System.out.println(reply);
} catch (Exception e) {
    e.printStackTrace();
}

        使用map函式,需要new一個Mapper物件,該物件需要兩個引數,第一個代表輸入型別,第二個代表處理後輸出的型別,與apply方法的入參和返回值相對應。

        map函式不僅可以作用於一個Future物件,map函式還可以作用於多個Future物件,當我們獲取到一系列Future之後,我們可以把它們組合成一個列表,利用map函式進行計算。例如:統計一個班級上姓李的同學數量。

示例:

public class MapActor extends AbstractActor {
    public static void main(String[] args) {
        String[] names = {"張三", "李四", "王五", "李春花", "李貴妃"};
        ActorSystem system = ActorSystem.create();
        Timeout timeout = new Timeout(Duration.create(3, TimeUnit.SECONDS));
        List<Future<Integer>> futures = new ArrayList<>();
        for (int i = 0; i < names.length; i++) {
            ActorRef ref = system.actorOf(Props.create(MapActor.class), "mapActor" + i);
            Future future = Patterns.ask(ref, names[i], timeout);
            futures.add(future);
        }
        //把Future列表轉換為帶有結果列表的Future物件
        Future<Iterable<Integer>> iterableFuture = Futures.sequence(futures, system.dispatcher());
        Future<Integer> map = iterableFuture.map(new Mapper<Iterable<Integer>, Integer>() {
            @Override
            public Integer apply(Iterable<Integer> parameter) {
                Integer count = 0;
                for (Integer o : parameter) {
                    count += o;
                }
                return count;
            }
        }, system.dispatcher());
        try {
            Integer reply = Await.result(map, timeout.duration());
            System.out.println("李姓數量: " + reply);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(String.class, s -> getSender().tell(s.startsWith("李") ? 1 : 0, getSelf())).build();
    }
}

       上述過程中,我們先使用sequence讓List集合轉換為帶有結果列表的Future物件,因為List集合是不能直接使用map方法的,畢竟map方法不屬於List,之後使用map方法遍歷結果計算,最終返回一個帶有我們所需結果的新Future物件。

     fold函式

       我們除了使用map函式統計之外,還可以使用fold函式,fold與map函式有所不同,fold函式提供了一個初始值,當我們傳一個帶有Future的列表時,fold會先使用初始值和列表的第一個值計算,然後把計算的值作為下一次的初始值,以此類推。修改上述程式碼:

       Future<Integer> fold = Futures.fold(0, futures, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer v1, Integer v2) throws Exception {
                System.out.println(v1+"--->"+v2);
                return v1 + v2;
           }
       }, system.dispatcher());

       輸出結果:

0--->0 
0--->1 
1--->0 
1--->1 
2--->1
李姓數量: 3

       當然這裡完全可以不需要初始值,所以我們也可以使用reduce函式,使用方式和fold類似,如下:

       Future<Integer> fold = Futures.reduce( futures, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer v1, Integer v2) throws Exception {
                System.out.println(v1+"--->"+v2);
                return v1 + v2;
            }
        }, system.dispatcher());

    總結

       使用Future可以讓我們在Actor外部獲取結果,並且可以選擇獲取結果的方式:非同步或者同步,儘可能的利用非同步方式獲取結果,可以提高系統的吞吐率和響應速度。另外,使用Future,我們還可以使用它所提供的諸多函式式API,方便我們對資料進行處理或計算。