1. 程式人生 > >討喜的隔離可變性(三)建立角色

討喜的隔離可變性(三)建立角色

宣告:本文是《Java虛擬機器併發程式設計》的第五章,感謝華章出版社授權併發程式設計網站釋出此文,禁止以任何形式轉載此文。

正如前面曾經提到過的那樣,雖然我們有很多支援角色的類庫可供選擇,但是在本書中我們將使用Akka。這是一個基於Scala的類庫,該類庫擁有非常好的效能和可擴充套件性、並同時支援角色和STM。此外,該類庫還可以被用於多種JVM上的語言中。在本章中,我們將注意力集中在Java和Scala身上。而在下一章,我們將會學習如何在其他語言中使用Akka的角色。

 某個角色的生命週期

圖 8‑2 某個角色的生存週期

由於Akka是用Scala實現的,所以在Scala中建立和使用角色非常簡單並且更加自然,從Akka API的實現裡我們也可以看到Scala簡約而不簡單的風格閃耀其中。除此之外,Akka的開發者們還設計了一套相當出色的傳統Java API,可以使我們在Java程式碼中很方便地建立和使用角色。下面我們將先學習如何在Java中使用這套API,然後再體驗一下用Scala時將有著怎樣的簡化和改變。

用Java建立角色

在Akka中,抽象類akka.actor.UntypedActor用於表示一個角色的抽象表示,而具體的角色定義則只需簡單繼承這個抽象類並實現其onReceive()函式就可以了——每當有訊息到達此角色時該函式將被呼叫。下面讓我們通過一個簡單的例項來對上述過程建立一個直觀感受。下面我們將會建立一個角色(actor)…不如就寫一個可以對扮演不同熒幕人物(role)的請求進行響應的HollywoodActor咋樣?

<br />
public class HollywoodActor extends UntypedActor {<br />
public void onReceive(final Object role) {<br />
System.out.println(&quot;Playing &quot; + role +<br />
&quot; from Thread &quot; + Thread.currentThread().getName());<br />
}<br />
}<br />

如上所示,onReceive()函式接受一個Object物件作為其引數。在本例中,我們只是簡單地將該引數以及負責處理訊息的執行緒的詳情打印出來。稍後我們將會學習如何處理不同型別的訊息。

在完成了角色(actor)的定義之後,我們還需要建立一個角色的例項,並將該角色(actor)曾經演過的熒幕人物(role)以訊息的形式傳送給它,下面讓我們來實現這部分內容:

<br />
public class UseHollywoodActor {<br />
public static void main(final String[] args) throws InterruptedException {<br />
final ActorRef johnnyDepp = Actors.actorOf(HollywoodActor.class).start();<br />
johnnyDepp.sendOneWay(&quot;Jack Sparrow&quot;);<br />
Thread.sleep(100);<br />
johnnyDepp.sendOneWay(&quot;Edward Scissorhands&quot;);<br />
Thread.sleep(100);<br />
johnnyDepp.sendOneWay(&quot;Willy Wonka&quot;);<br />
Actors.registry().shutdownAll();<br />
}<br />
}<br />

在Java中我們通常都是用new來建立物件的,但由於Akka的角色並非簡單物件而是活動物件(active objects),所以我們需要用一個特殊函式actorOf()來完成建立動作。此外,我們還可以先用new生成一個例項,然後再呼叫actorOf()對該例項進行封裝以獲得一個角色的引用,關於這種建立方式我們稍後會再研究具體細節。當我們建立好了角色之後,就可以通過呼叫其start()函式來啟動該角色。而當我們啟動一個角色時,Akka會將其寫入一個登錄檔(registry)中,於是在這個角色停止執行之前我們都可以通過登錄檔來訪問它。在本例中,johnnyDeep即為角色例項的引用,其型別為ActorRef。

接下來,我們通過sendOneWay()函式向johnnyDeep傳送了一些附帶著我們希望其扮演的熒幕人物(role)的訊息。當訊息發出之後,其實我們本不用加入那幾個100毫秒等待時間的,但插入延時將有助於我們更好地學習角色如何進行執行緒切換的運作細節。在程式碼的結尾處,我們關閉了所有執行中的角色。除了程式碼示例中所使用的shutdownAll()之外,我們還可以逐個呼叫每個角色的stop()函式或給所有角色傳送kill訊息的方式來達到關停所有角色的目的。

為了能夠執行上面的例項,我們需要先把Akka的庫檔案都新增到classpath中,然後通過javac對程式碼進行編譯。編譯完成之後,我們就可以像執行其他常規Java程式一樣執行本節的示例程式。需要再次提醒你的是,請務必記得將所有相關的JAR都新增到classpath中。下面就是我在我的系統上所使用的編譯和執行指令:

<br />
javac -d . -classpath $AKKA_JARS HollywoodActor.java UseHollywoodActor.java<br />
java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseHollywoodActor<br />

其中AKKA_JARS的定義如下所示:

<br />
export AKKA_JARS=&quot;$AKKA_HOME/lib/scala-library.jar:\<br />
$AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\<br />
$AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\<br />
$AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\<br />
$AKKA_HOME/lib/akka/akka-typed-actor-1.1.3.jar:\<br />
$AKKA_HOME/lib/akka/aspectwerkz-2.2.3.jar:\<br />
$AKKA_HOME/config:\<br />
.&quot;<br />

為了使例項程式碼能否順利地編譯執行,請根據你所使用的作業系統來定義AKKA_JARS環境變數,以便編譯器能夠正確定位到Scala和Akka的安裝路徑。其中,scala-library.jar是scala相關的功能集合,而我們既可以使用Akka自帶的jar,也可以使用Scala安裝路徑下的那一份。

預設情況下Akka會將額外的日誌訊息輸出到控制檯,關於如何對這一行為進行配置請參閱6.8節。

下面讓我們編譯並執行示例程式碼,並觀察角色對於訊息的響應情況:

<br />
Playing Jack Sparrow from Thread akka:event-driven:dispatcher:global-1<br />
Playing Edward Scissorhands from Thread akka:event-driven:dispatcher:global-2<br />
Playing Willy Wonka from Thread akka:event-driven:dispatcher:global-3<br />

通過輸出結果我們可以看到,示例角色每次只響應一個訊息,並且每次執行角色的執行緒都是不同的。對於訊息處理的過程而言,既可以一個執行緒處理多個訊息,也可以像本例這樣由不同執行緒處理不同的訊息——但無論是哪種處理模式,在任意時刻都只能有一個訊息被處理。該模式的關鍵點在於,所有角色都是單執行緒的,但是在陷入等待狀態時角色會優雅地將執行緒釋放而不是抓住執行緒不撒手。我們在傳送訊息之後插入的sleep語句的目的就是為了將actor引入等待狀態以便更清晰地演示這一運作細節。

上例中,我們建立角色時沒有帶任何構造函參。而如果需要的話,我們可以在角色的建立過程中引入一些引數。例如,我們可以用好萊塢演員的名字來初始化之前的HollywoodActor:

<br />
public class UseHollywoodActor {<br />
public static void main(final String[] args) throws InterruptedException {<br />
final ActorRef tomHanks = Actors.actorOf(new UntypedActorFactory() {<br />
public UntypedActor create() { return new HollywoodActor(&quot;Hanks&quot;); }<br />
}).start();<br />
tomHanks.sendOneWay(&quot;James Lovell&quot;);<br />
tomHanks.sendOneWay(new StringBuilder(&quot;Politics&quot;));<br />
tomHanks.sendOneWay(&quot;Forrest Gump&quot;);<br />
Thread.sleep(1000);<br />
tomHanks.stop();<br />
}<br />
}<br />

新版的HollywoodActor類的建構函式定義了一個名為name的String型別引數。而在onReceive()函式中,我們對於不能識別的訊息進行了專門的處理,即簡單地在螢幕輸出該好萊塢演員未曾飾演過那個未識別的訊息所代表的熒幕人物(role)。當然我們也可以採取其他動作,比如返回一個錯誤碼、打日誌、向上層呼叫者拋異常等等。下面讓我們看看如何將給這個建構函式傳遞引數:

<br />
public class HollywoodActor extends UntypedActor {<br />
private final String name;<br />
public HollywoodActor(final String theName) { name = theName; }<br />
public void onReceive(final Object role) {<br />
if(role instanceof String)<br />
System.out.println(String.format(&quot;%s playing %s&quot;, name, role));<br />
else<br />
System.out.println(name + &quot; plays no &quot; + role);<br />
}<br />
}<br />

一般情況下,我們都是通過傳送訊息而不是直接呼叫函式的方式與角色進行互動的。Akka不希望我們拿到角色的直接引用,而是希望我們只針對ActorRef的引用進行操作。這樣一來,Akka就可以確保我們不會往角色裡新增其他函式,並且也不會與角色例項進行直接的互動。直接操縱角色例項的行為會將我們帶回到共享可變性的泥淖中,而這正是我們極力想要避免。此外,這種受控的角色建立方式也便於Akka更好地回收廢棄的角色。所以如果我們試圖直接建立一個角色類的例項,Akka將丟擲一個內容為“請不要用’new’操作符顯示地建立角色例項”的akka.actor.ActorInitializationException異常。

Akka允許我們以一種受控的方式建立角色例項,即我們可以在一個匿名類中實現UntypedActorFactory介面,並在其create()函式中實現建立角色例項的邏輯。而接下來的actorOf()則把一個繼承自UntypedActor的普通物件轉換為為一個Akka角色。隨後,我們和之前一樣向這個actor傳送幾條訊息並觀察輸出結果。

在本例中,HollywoodActor只接受String型別的訊息,但我們在測試用例中向其傳送了一條值為Politics、型別為StringBuilder的訊息。而我們在onReceive()函式中設計的檢查邏輯將會發現並處理這一情況。最後,我們會呼叫stop()函式來終止角色的執行。程式碼結尾處插入sleep(1000)的目的是為了讓角色在結束之前有機會響應所有未處理的訊息。最終的輸出結果如下所示:

<br />
Hanks playing James Lovell<br />
Hanks plays no Politics<br />
Hanks playing Forrest Gump<br />

用Scala建立角色

在Scala中建立Akka角色時,我們沒有像在Java版本中那樣繼承UntypedActor類,而是要繼承Actor trait並實現receive()函式。下面讓我們用Scala來實現之前剛剛用Java寫過的HollywoodActor類:

<br />
class HollywoodActor extends Actor {<br />
def receive = {<br />
case role =&gt;<br />
println(&quot;Playing &quot; + role +<br />
&quot; from Thread &quot; + Thread.currentThread().getName())<br />
}<br />
}<br />

在上面的程式碼中,receive()函式實現了一個PartialFunction並採用了Scala模式匹配的形式,但為了避免分散注意力我們現在先忽略這些細節。當有訊息到達時,receive()函式將被呼叫;如果對Scala語法還不熟悉的話,你可以暫時先把receive()函式想象成一個大的switch語句,其實現的功能與Java版本是完全相同的。

至此我們已經看到了如何定義一個角色,下面讓我們把注意力集中到角色的使用上面:

<br />
object UseHollywoodActor {<br />
def main(args : Array[String]) :Unit = {<br />
val johnnyDepp = Actor.actorOf[HollywoodActor].start()<br />
johnnyDepp ! &quot;Jack Sparrow&quot;<br />
Thread.sleep(100)<br />
johnnyDepp ! &quot;Edward Scissorhands&quot;<br />
Thread.sleep(100)<br />
johnnyDepp ! &quot;Willy Wonka&quot;<br />
Actors.registry.shutdownAll<br />
}<br />
}<br />

Actor類的actorOf()函式有多個過載定義,這裡我們所採用的是接受一個角色類名(即程式碼中的 [HollywoodActor])作為其引數的版本。在角色被創建出來之後,我們隨即通過呼叫start()函式將其啟動。在本例中,ActorRef型別的變數johnnyDepp即為我們所建立的角色例項的引用。由於Scala可以進行型別推斷,所以我們可以不必在程式碼中明確指定johnnyDepp的型別。

接下來,我們給johnnyDepp傳送了3個附帶著我們希望其扮演的熒幕人物的訊息。噢,稍等一下,這裡有一個細節請你注意,即我們是通過特殊函式!來發送訊息的。當你見到actor!message時,請從右向左閱讀這個語句,就能明白這條語句的意思是把訊息傳送給指定的角色。這處細節再次展現了Scala在語法方面的簡潔與優雅。通過這種方式,我們就無需再將傳送訊息的語句寫成actor.!(message),而是簡單地將句點和括號拿掉,簡寫成actor!message就行了。如果我們更喜歡Java裡傳送訊息的那個函式,那麼我們也可以把Scala簡潔的語法用在Java風格的函式上,即把語句寫成actor sendOneWay message。上面示例中餘下的程式碼與之前Java版本的示例完全相同,這裡就不再贅述。

下面我們將通過scalac編譯器對上述程式碼進行編譯,但首先請務必記住要把Akka庫檔案新增到classpath中。編譯完成後,我們就可以像之前執行普通Java程式那樣執行上面的scala示例程式。需要再次提醒你的是,請務必記得將所需的JARs加入到你係統的classpath中。下面是我在我的系統上所使用的編譯和執行指令,請你根據你係統中Scala和Akka的安裝目錄來自行調整classpath中相關的路徑資訊:

<br />
scalac -classpath $AKKA_JARS HollywoodActor.scala UseHollywoodActor.scala<br />
java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseHollywoodActor<br />

如果我們想要禁止日誌訊息輸出到控制檯的話,請參閱6.8節中的相關內容。在將上述示例程式碼編譯並執行之後,我們可以看到其輸出結果與之前的Java版本是非常相似的:

<br />
class HollywoodActor(val name : String) extends Actor {<br />
def receive = {<br />
case role : String =&gt; println(String.format(&quot;%s playing %s&quot;, name, role))<br />
case msg =&gt; println(name + &quot; plays no &quot; + msg)<br />
}<br />
}<br />

如果想在建立角色時傳些引數給它,如好萊塢演員的名字等,你會發現用Scala來實現會比之前的Java版本簡單很多。下面讓我們先對HollywoodActor類進行改造,以使其可以接受構造函參:

<br />
class HollywoodActor(val name : String) extends Actor {<br />
def receive = {<br />
case role : String =&gt; println(String.format(&quot;%s playing %s&quot;, name, role))<br />
case msg =&gt; println(name + &quot; plays no &quot; + msg)<br />
}<br />
}<br />

如上所示,新版本的HollywoodActor類接受一個名為name的String型別的構造函參。而在receive()函式中,我們對於格式無法識別的訊息做了專門的處理。在Scala中我們無需再使用instanceof,receive()函式中的case語句即可實現訊息與各種模式之間的匹配——在本例中特指訊息型別的匹配。

我們用Java建立接受一個構造函參的角色時還是花了不少力氣的,但在Scala中一切變得如此簡單:

<br />
object UseHollywoodActor {<br />
def main(args : Array[String]) : Unit = {<br />
val tomHanks = Actor.actorOf(new HollywoodActor(&quot;Hanks&quot;)).start()<br />
tomHanks ! &quot;James Lovell&quot;<br />
tomHanks ! new StringBuilder(&quot;Politics&quot;)<br />
tomHanks ! &quot;Forrest Gump&quot;<br />
Thread.sleep(1000)<br />
tomHanks.stop()<br />
}<br />
}<br />

在上面的程式碼中,我們先用new關鍵字對角色進行初始化,隨後又將例項化好的物件傳給actorOf()函式(這是由於Akka禁止在actorOf()函式之外隨意地建立actor例項)。通過這一動作,我們就將一個繼承自Actor的普通物件轉換成了一個Akka角色。接下來,我們同樣會給新建立的角色傳送3條訊息。剩下的程式碼與Java版本非常相似,這裡就不再贅述。最後讓我們執行上述示例程式碼,並確認其輸出與Java版本是否相同:

<br />
Hanks playing James Lovell<br />
Hanks plays no Politics<br />
Hanks playing Forrest Gump<br />


方 騰飛

花名清英,併發網(ifeve.com)創始人,暢銷書《Java併發程式設計的藝術》作者,螞蟻金服技術專家。目前工作於支付寶微貸事業部,關注網際網路金融,併發程式設計和敏捷實踐。微信公眾號aliqinying。