1. 程式人生 > >《Java高併發程式設計》學習 --7.10 多個Actor同時修改資料:Agent

《Java高併發程式設計》學習 --7.10 多個Actor同時修改資料:Agent

在Actor的程式設計模型中,Actor之間主要通過訊息進行資訊傳遞。因此,很少發生多個Actor需要訪問一個共享變數的情況。但在實際開發中,這種情況很難完全避免。如果多個Actor需要對同一個共享變數進行讀寫時,如何保證執行緒安全呢? 在Akka中,使用一種叫做Agent的元件來實現這個功能。一個Agent提供了對一個變數的非同步更新。當一個Actor希望改變Agent的值時,它會向這個Agent下發一個動作(action)。當多個Actor同時改變Agent時,這些action將會在ExecutionContext中被併發排程執行。在任何時刻,一個Agent最多隻能執行一個action,對於某一個執行緒來說,它執行action的順序與它的發生順序一致,但對於不同執行緒來說,這些action可能會交織在一起。 Agent的修改可以使用兩個方法send()或者alter()。它們都可以向Agent傳送一個修改動作。但是send()方法沒有返回值,而alter()方法會返回一個Future物件便於跟蹤Agent的執行。 下面模擬一個場景:有10個Actor,它們一起對一個Agent執行累加操作,每個agent累加10000次,如果沒有意外,那麼agent最終的值將是100000,如果Actor間的排程出現問題,那麼這個值可能小於100000。
public class CounterActor extends UntypedActor {
	Mapper<Integer, Integer> addMapper = new Mapper<Integer, Integer>() {
		public Integer apply(Integer i) {
			return i + 1;
		};
	};
	@Override
	public void onReceive(Object msg) throws Exception {
		if(msg instanceof Integer) {
			for(int i=0; i<10000; i++) {
				Future<Integer> f = AgentDemo.counterAgent.alter(addMapper);
				AgentDemo.futures.add(f);
			}
			getContext().stop(getSelf());
		} else 
			unhandled(msg);
	}
}
上述程式碼定義了一個累加的Actor:CounterActor,且定義了累計動作 action addMapper。它的作用是對Agent的值進行修改。 CounterActor的訊息處理函式onReceive()中,對全域性counterAgent進行累加操作,alter()指定了累加動作addMapper。由於我們希望在將來知道累加行為是否完成,因此在這裡將返回Future物件進行收集。完成任務後,Actor自行退出。 程式的主函式如下:
public class AgentDemo {
	public static Agent<Integer> counterAgent = Agent.create(0, ExecutionContexts.global());
	static ConcurrentLinkedDeque<Future<Integer>> futures = new ConcurrentLinkedDeque<Future<Integer>>();

	public static void main(String[] args) {
		final ActorSystem system = ActorSystem.create("agentdemo", ConfigFactory.load("samplehello.conf"));
		ActorRef[] counter = new ActorRef[10];
		for(int i=0; i<counter.length; i++) {
			counter[i] = system.actorOf(Props.create(CounterActor.class),"counter_" + i);
		}
		final Inbox inbox = Inbox.create(system);
		for(int i=0; i<counter.length; i++) {
			inbox.send(counter[i], 1);
			inbox.watch(counter[i]);
		}

		int closeCount = 0;
                //等待所有Actor全部結束
		while(true) {
			Object msg = inbox.receive(Duration.create(1, TimeUnit.SECONDS));
			if(msg instanceof Terminated) {
				closeCount++;
				if(closeCount == counter.length) {
					break;
				}
			} else {
				System.out.println(msg);
			}
		}
		//等待所有的累加執行緒完成,因為他們都是非同步的
		Futures.sequence(futures, system.dispatcher()).onComplete(
				new OnComplete<Iterable<Integer>>() {
					@Override
					public void onComplete(Throwable arg0,
							Iterable<Integer> arg1) throws Throwable {
						System.out.println("counterAgent=" + counterAgent.get());
						system.shutdown();
					}
				}, system.dispatcher());
	}
}
上述程式碼中,建立了10個CounterActor物件。使用Inbox與CounterActor進行通訊。第14行的訊息將觸發CounterActor進行累加操作。第20~30行系統將等待所有10個CounterActor執行結束。執行完成後,我們便已經收集了所有的Future。在第32行,將所有的Future進行序列組合(使用sequence()方法),構造了一個整體的Future,併為它建立onComplete()回撥函式。在所有的Agent操作執行完成後,onComplete()方法就會被呼叫(第35行)。在這個例子中,簡單地輸出最終的counterAgent值,並關閉系統。 執行上述程式,得到結果:
counterAgent=100000