【翻譯】- EffectiveAkka-第二章(一)
Actor使用模式
現在我們已經了解了可以創建的actor系統的不同類型,那麽我們在編寫基於actor的應用程序時,可以采用什麽樣的使用模式,以便避免出現常見錯誤呢? 下面就讓我們看看其中使用模式。
Extra模式
異步編程中最困難的任務之一就是嘗試捕獲上下文,以便在任務開始時可以準確地表示任務完成時的世界狀態。 但是,創建Akka actors的匿名實例是一種非常簡單且輕量級的解決方案,用於在處理消息時捕獲上下文以在任務成功完成時使用。 它們就像電影演員中的演員一樣 - 幫助為正在他們身邊工作的主要演員提供真實情境。
問題
一個很好的例子是一個actor順序處理郵箱中的消息,但是用Future在額外的線程中處理基於這些消息的任務。 這是設計actor的好方法,因為他們不會阻塞響應,可以同時處理更多的消息並提高應用程序的性能。 但是,actor的狀態可能隨每條消息而改變。
我們來定義這個例子的樣板。 這些類將在我們開發過程的每個叠代中重用。 請註意,這些代碼都可以在我的GitHub倉庫中找到,你可以克隆並測試它們。首先,我們有一條消息告訴actor檢索特定ID客戶的賬戶余額:
case class GetCustomerAccountBalances(id: Long)
接下來,我們有返回請求的帳戶信息的數據傳輸對象。 由於客戶可能沒有任何類型的賬戶,也可能有多於一種賬戶類型,所以我們在這種情況下返回Option [List [(Long,BigDecimal)]],其中Long代表 一個賬戶標識符,BigDecimal代表一個余額:
case class AccountBalances( val checking: Option[List[(Long, BigDecimal)]], val savings: Option[List[(Long, BigDecimal)]], val moneyMarket: Option[List[(Long, BigDecimal)]]) case class CheckingAccountBalances( val balances: Option[List[(Long, BigDecimal)]]) case class SavingsAccountBalances( val balances: Option[List[(Long, BigDecimal)]]) case class MoneyMarketAccountBalances( val balances: Option[List[(Long, BigDecimal)]])
我在本書的前言中承諾,將展示如何通過領域驅動設計將其與Eric Evans的概念聯系起來。 看看我為了完成這項工作創建的類。我們可以將整個AccountService視為一個上下文綁定,其中CheckingAccount或SavingsAccount是一個實體。其中表示余額的數字是一個值。CheckingBalances,Saving Balances和mmBalances字段是聚合,而返回類型的AccountBalances是聚合根。 最後,Vaughn Vernon在他出色的“Implementing DomainDriven Design”中指出Akka是事件驅動的上下文綁定的可能實現。 使用Akka實現命令查詢職責分離(按照Greg Young的規範)和事件源(使用開源事件源庫)也很容易。
最後,我們有代表服務接口的代理ttrait。 就像使用向服務暴露接口而不是類的實現的Java最佳實踐一樣,我們將在這裏遵循這個約定,並定義服務接口,然後可以在我們的測試中將其刪除:
trait SavingsAccountsProxy extends Actor trait CheckingAccountsProxy extends Actor trait MoneyMarketAccountsProxy extends Actor
我們舉一個actor的例子,這個actor可以作為一個代理從多個數據源獲取一個金融服務的客戶賬戶信息。此外,我們假設每個用於儲蓄、支票和貨幣市場賬戶余額的子系統代理將可選地返回該客戶的賬戶及其余額的清單,並且我們將這些作為依賴關系註入到檢索器類中。我們來編寫一些基本的Akka actor代碼來執行這個任務:
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask import akka.util.Timeout class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor { implicit val timeout: Timeout = 100 milliseconds implicit val ec: ExecutionContext = context.dispatcher def receive = { case GetCustomerAccountBalances(id) => val futSavings = savingsAccounts ? GetCustomerAccountBalances(id) val futChecking = checkingAccounts ? GetCustomerAccountBalances(id) val futMM = moneyMarketAccounts ? GetCustomerAccountBalances(id) val futBalances = for { savings <- futSavings.mapTo[Option[List[(Long, BigDecimal)]]] checking <- futChecking.mapTo[Option[List[(Long, BigDecimal)]]] mm <- futMM.mapTo[Option[List[(Long, BigDecimal)]]] } yield AccountBalances(savings, checking, mm) futBalances map (sender ! _) } }
這段代碼非常簡潔。AccountBalanceRetriever actor收到一條獲取客戶的賬戶余額消息,然後同時觸發三個future。 第一個將獲得客戶的儲蓄賬戶余額,第二個將獲得支票賬戶余額,第三個賬戶將獲得貨幣市場余額。 並行執行這些任務可以避免按順序執行檢索的昂貴成本。 此外,請註意,雖然future會通過賬戶ID返回某些賬戶余額的期權,但如果它們返回None,這for語句並不會短路 - 如果futSaving返回None,for語句繼續執行。
然而,有幾件事情並不理想。 首先,它使用future向其他actor請求應答,這會為每個在幕後發送的消息創建一個新的PromiseActorRef。 這有點浪費資源。 最好是讓我們的AccountBalanceRetriever actor以一種“fire and forget”的方式發送消息,並將結果異步收集到一個actor中。
此外,在這個代碼中有一個明顯的競爭條件 - 你能找到它嗎? 我們在映射操作中引用來自futBalances的結果中的“sender”,這可能與未來完成時的ActorRef不同,因為AccountBalanceRetriever ActorRef現在可能正在處理另一個來自不同發件人的郵件!
避免Ask
讓我們首先關註消除在actor中要求回復的需求。 我們可以通過!發送消息,並把應答收集到一個賬號的余額包含可選值的清單中。但是我們怎麽去做呢?
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor._ class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor { val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None var originalSender: Option[ActorRef] = None def receive = { case GetCustomerAccountBalances(id) => originalSender = Some(sender) savingsAccounts ! GetCustomerAccountBalances(id) checkingAccounts ! GetCustomerAccountBalances(id) moneyMarketAccounts ! GetCustomerAccountBalances(id) case AccountBalances(cBalances, sBalances, mmBalances) => (checkingBalances, savingsBalances, mmBalances) match { case (Some(c), Some(s), Some(m)) => originalSender.get ! AccountBalances(checkingBalances, savingsBalances, mmBalances) case _ => } } }
這會好一點,但仍然有很多不足之處。 首先,我們創建了在實例級收到的余額集合,這意味著我們無法把響應聚合區分為單個請求以獲取帳戶余額。 更糟糕的是,我們無法將超時的請求返回原始請求者。 最後,雖然我們已經將原始發件人捕獲為可能有值的實例變量(因為在AccountBalanceRetriever啟動時沒有originalSender),但當我們想要發回數據時,仍無法確定originalSender是否就是我們想要的值 !
捕獲上下文
問題在於我們試圖從多個來源中檢索數據的脫機操作的結果,並將其返回給首先向我們發送消息的任何人。 然而,當這些future完成時,actor可能已經開始處理其郵箱中的其他消息了,此時AccountBalanceRetriever actor中代表“sender”的狀態可能是完全不同的actor實例。 那麽我們如何解決這個問題呢?
訣竅是為正在處理的每個GetCustomerAccountBalan ces消息創建一個匿名內部actor。 通過這樣做,您可以捕捉到future填充時需要的狀態。 讓我們看看怎麽做:
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor._ class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor { val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None def receive = { case GetCustomerAccountBalances(id) => { context.actorOf(Props(new Actor() { var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None val originalSender = sender def receive = { case CheckingAccountBalances(balances) => checkingBalances = balances isDone case SavingsAccountBalances(balances) => savingsBalances = balances isDone case MoneyMarketAccountBalances(balances) => mmBalances = balances isDone } def isDone = (checkingBalances, savingsBalances, mmBalances) match { case (Some(c), Some(s), Some(m)) => originalSender ! AccountBalances(checkingBalances, savingsBalances, mmBalances) context.stop(self) case _ => } savingsAccounts ! GetCustomerAccountBalances(id) checkingAccounts ! GetCustomerAccountBalances(id) moneyMarketAccounts ! GetCustomerAccountBalances(id) })) } } }
這樣就好多了。 我們已經捕獲了每個接收的狀態,並且只有當三個值都具有值時才將其發回給originalSender。但這裏還有兩個問題。首先,我們沒有定義在超時的時候,如何將原始請求的響應返回給請求他們的人。 其次,我們的originalSender仍然會得到一個錯誤的值 - “sender”實際上是匿名內部actor的sender值,而不是發送原始GetCustomerAccountBalan ces消息的sender值!
發送超時消息
我們可以發送一條超時消息來處理可能超時的請求,通過允許另一個任務爭用超時完成操作的權利。 這是一種非常幹凈的方式,同時仍然對請求實施超時語義。 如果在超時消息之前,所有三種帳戶類型的數據均已在郵箱中排隊,則AccountBalan ces類型的正確響應會發送回原始發件人。 但是,如果來自計劃任務的超時消息在這三個響應中的任何一個響應之前發生,則超時消息會返回給客戶端。
請註意,我僅在特定帳戶類型代理沒有返回任何數據時才使用“None”來表示。 在找到客戶但未找到任何數據的情況下,我會收到Some(List())的響應,這意味著在該賬戶類型中找不到該客戶的任何數據。這樣,我可以在語義上區分是否收到回復以及何時未找到數據。
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import org.jamieallen.effectiveakka.common._ import akka.actor.{ Actor, ActorRef, Props, ActorLogging } import akka.event.LoggingReceive object AccountBalanceRetrieverFinal { case object AccountRetrievalTimeout } class AccountBalanceRetrieverFinal(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor with ActorLogging { import AccountBalanceRetrieverFinal._ def receive = { case GetCustomerAccountBalances(id) => { log.debug(s"Received GetCustomerAccountBalances for ID: $id from $sender") val originalSender = sender context.actorOf(Props(new Actor() { var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None def receive = LoggingReceive { case CheckingAccountBalances(balances) => log.debug(s"Received checking account balances: $balances") checkingBalances = balances collectBalances case SavingsAccountBalances(balances) => log.debug(s"Received savings account balances: $balances") savingsBalances = balances collectBalances case MoneyMarketAccountBalances(balances) => log.debug(s"Received money market account balances: $balances") mmBalances = balances collectBalances case AccountRetrievalTimeout => sendResponseAndShutdown(AccountRetrievalTimeout) } def collectBalances = (checkingBalances, savingsBalances, mmBalances) match { case (Some(c), Some(s), Some(m)) => log.debug(s"Values received for all three account types") timeoutMessager.cancel sendResponseAndShutdown(AccountBalances(checkingBalances, savingsBalances, mmBalances)) case _ => } def sendResponseAndShutdown(response: Any) = { originalSender ! response log.debug("Stopping context capturing actor") context.stop(self) } savingsAccounts ! GetCustomerAccountBalances(id) checkingAccounts ! GetCustomerAccountBalances(id) moneyMarketAccounts ! GetCustomerAccountBalances(id) import context.dispatcher val timeoutMessager = context.system.scheduler. scheduleOnce(250 milliseconds) { self ! AccountRetrievalTimeout } })) } } }
現在我們可以收集我們的結果並檢查是否收到預期值,並將它們放入AccountBalances結果中以返回給調用方,同時也可以取消預定任務,以免浪費資源。最後,我們必須記得去停止匿名內部actor,以便在收到每個GetCustomerAc countBalances消息時不會泄露內存,而無論我們是否收到了三個響應或超時消息!
那麽我們為什麽必須將AccountRetrievalTimeout消息發送給我們自己,放入Extra actor隊列中,而不是直接將它發送回我們的scheduleOnce lambda中的原始sender? 計劃任務將在另一個線程上運行! 如果我們執行相關工作來清理該線程上的actor,我們就把並發性引入到了actor中。在這個例子中,雖然我們只是告訴actor發送消息後自行停止,但如果您不發送消息給自己,那麽會很容易的陷入關閉某個狀態並操縱它的陷阱。 還有其他一些調度接口會使某些操作更加明顯,例如此處顯示的方法調用樣式:
val timeoutMessager = context.system.scheduler.scheduleOnce(250 milliseconds, self, AccountRetrievalTimeout)
你必須對此保持警惕。 有時候,可能很容易陷入將並發性引入actor的陷阱,而此時並不應該存在任何並發性。如果你看到自己在actor上使用花括號,那就想想裏面發生的事情以及你可能需要關閉的資源。
為什麽不用promise?
在本示例的早期版本中,我嘗試使用promise來執行此項工作,其中AccountBalances類型的成功結果和超時失敗結構都放入promise的future內部。然而,這是非常復雜的,因為我們可以在消息排隊時執行相同的基本任務時允許Extra actor隊列中的排序。 但是,你也不能從promise返回future的值 - 他們不能被發送給actor,不管actor是不是遠程的。 由於位置透明度的優點,這是actor不應該關註的實現細節。
future永遠不要再actor之間傳遞,因為你不能序列化一個線程
如何測試邏輯
現在我們有一些可以工作的代碼,需要編寫一些測試來證明它的正確性。如果你是TDD的追隨者,你可能會對我沒有一開始就寫測試感到羞愧。我並不堅守什麽時候寫測試,我只關心測試寫入。
我們要做的第一件事是定義在測試中使用並作為依賴關系註入到檢索者actor中的測試存根。這些存根可以是非常簡單的actor - 當通過特定的客戶ID詢問其類型的帳戶信息時,每個沒有故障的測試案例存根將按帳戶ID返回可選的余額列表。測試中使用的每個客戶的數據都需要放入要查找的map中,如果沒有返回數據,則返回Some(List())的值以符合我們的API
import akka.actor.{ Actor, ActorLogging } import akka.event.LoggingReceive class CheckingAccountsProxyStub extends CheckingAccountsProxy with ActorLogging { val accountData = Map[Long, List[(Long, BigDecimal)]]( 1L -> List((3, 15000)), 2L -> List((6, 640000), (7, 1125000), (8, 40000))) def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => log.debug(s"Received GetCustomerAccountBalances for ID: $id") accountData.get(id) match { case Some(data) => sender ! CheckingAccountBalances(Some(data)) case None => sender ! CheckingAccountBalances(Some(List())) } } } class SavingsAccountsProxyStub extends SavingsAccountsProxy with ActorLogging { val accountData = Map[Long, List[(Long, BigDecimal)]]( 1L -> (List((1, 150000), (2, 29000))), 2L -> (List((5, 80000)))) def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => log.debug(s"Received GetCustomerAccountBalances for ID: $id") accountData.get(id) match { case Some(data) => sender ! SavingsAccountBalances(Some(data)) case None => sender ! SavingsAccountBalances(Some(List())) } } } class MoneyMarketAccountsProxyStub extends MoneyMarketAccountsProxy with ActorLogging { val accountData = Map[Long, List[(Long, BigDecimal)]]( 2L -> List((9, 640000), (10, 1125000), (11, 40000))) def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => The Extra Pattern | 17 log.debug(s"Received GetCustomerAccountBalances for ID: $id") accountData.get(id) match { case Some(data) => sender ! MoneyMarketAccountBalances(Some(data)) case None => sender ! MoneyMarketAccountBalances(Some(List())) } } }
在失敗情況下(比如超時),存根將模擬長時間運行的阻塞的數據庫調用,該調用不會對調用參與者發送響應,從而無法及時完成:
class TimingOutSavingsAccountProxyStub extends SavingsAccountsProxy with ActorLogging { def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => log.debug(s"Forcing timeout by not responding!") } }
以下示例顯示如何編寫測試用例以成功返回AccountBalances。由於本示例使用存根代理來接收帳戶信息,因此註入僅測試存根代理會導致發生超時功能,這是微不足道的。
我們還希望確保每個處理的消息的上下文的完整性由我們的檢索器維護。 為此,我們依次發送來自不同TestProbe實例的多個消息,並驗證不同的值是否正確地返回。
請註意我如何使用within塊來驗證預期響應的時間。 這是驗證您的測試正在執行以滿足系統的非功能需求的好方法。使用within塊指定執行的最大時間,正如我們在失敗情況中看到的那樣,我們沒有太早或遲到地收到響應。
最後,我們通過在我們的檢索器中註入一個超時存根來測試超時條件,並確保超時響應是我們的測試收到的響應:
import akka.testkit.{ TestKit, TestProbe, ImplicitSender } import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } import org.scalatest.WordSpecLike import org.scalatest.matchers.MustMatchers import scala.concurrent.duration._ import org.jamieallen.effectiveakka.common._ import org.jamieallen.effectiveakka.pattern.extra.AccountBalanceRetrieverFinal._ class ExtraFinalSpec extends TestKit(ActorSystem("ExtraTestAS")) with ImplicitSender with WordSpecLike with MustMatchers { "An AccountBalanceRetriever" should { "return a list of account balances" in { 18 | Chapter 2: Patterns of Actor Usage val probe2 = TestProbe() val probe1 = TestProbe() val savingsAccountsProxy = system.actorOf(Props[SavingsAccountsProxyStub], "extra-success-savings") val checkingAccountsProxy = system.actorOf(Props[CheckingAccountsProxyStub], "extra-success-checkings") val moneyMarketAccountsProxy = system.actorOf( Props[MoneyMarketAccountsProxyStub], "extra-success-money-markets") val accountBalanceRetriever = system.actorOf( Props(new AccountBalanceRetrieverFinal(savingsAccountsProxy, checkingAccountsProxy, moneyMarketAccountsProxy)), "extra-retriever") within(300 milliseconds) { probe1.send(accountBalanceRetriever, GetCustomerAccountBalances(1L)) val result = probe1.expectMsgType[AccountBalances] result must equal(AccountBalances( Some(List((3, 15000))), Some(List((1, 150000), (2, 29000))), Some(List()))) } within(300 milliseconds) { probe2.send(accountBalanceRetriever, GetCustomerAccountBalances(2L)) val result = probe2.expectMsgType[AccountBalances] result must equal(AccountBalances( Some(List((6, 640000), (7, 1125000), (8, 40000))), Some(List((5, 80000))), Some(List((9, 640000), (10, 1125000), (11, 40000))))) } } "return a TimeoutException when timeout is exceeded" in { val savingsAccountsProxy = system.actorOf( Props[TimingOutSavingsAccountProxyStub], "extra-timing-out-savings") val checkingAccountsProxy = system.actorOf( Props[CheckingAccountsProxyStub], "extra-timing-out-checkings") val moneyMarketAccountsProxy = system.actorOf( Props[MoneyMarketAccountsProxyStub], "extra-timing-out-money-markets") val accountBalanceRetriever = system.actorOf( Props(new AccountBalanceRetrieverFinal(savingsAccountsProxy, checkingAccountsProxy, moneyMarketAccountsProxy)), "extra-timing-out-retriever") val probe = TestProbe() within(250 milliseconds, 500 milliseconds) { probe.send(accountBalanceRetriever, GetCustomerAccountBalances(1L)) probe.expectMsg(AccountRetrievalTimeout) } The Extra Pattern | 19 } } }
現在我們的測試檢查了成功案例,失敗導致預期的行為。因為AccountRetrievalTimeout是一個case對象,它是一個“term”,而不是“type”,因此我可以使用expectMsg()方法而不是expectMsgType []
即使使用強大的工具,異步編程也並非易事。 我們總是必須考慮到我們需要的狀態以及我們在需要的時候獲取它的環境
【翻譯】- EffectiveAkka-第二章(一)