Akka-CQRS(5)- CQRS Writer Actor 部署和測試
上篇我們做了一個WriterActor的例子,主要目的是示範WriterActor如何作為叢集分片用persistentActor特性及event-sourcing模式實現CQRS的寫功能。既然是叢集分片,那麼我們就在這篇講講WriterActor的部署和測試,因為這個裡面還是有些值得注意的地方。下面是一段WriteActor,即叢集分片(cluster-sharding)的部署程式碼:
ClusterSharding(system).start( typeName = shardName, entityProps = writerProps, settings = cpsSettings, extractEntityId = getPOSId, extractShardId = getShopId, allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings), handOffStopMessage = PassivatePOS )
注意帶handOffStopMessage引數的start函式必須同時提供allocationStrategy。這個引數提供了passivation訊息型別。
整個叢集分片部署程式碼如下:
object POSRouter extends LogSupport { def main(args: Array[String]) { import WriterActor._ import Commands._ val argsPat = "(.*):(.*)".r val (host, port) = args(0) match { case argsPat(h, p) => (h, p) case _ => ("localhost", "2551") } val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"") .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\"")) //roles can be deployed on this node .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]")) .withFallback(ConfigFactory.load()) log.info(s"******* hostname = $host,port = $port *******") val shardName = "POSShard" case class POSMessage(id: Long, cmd: POSCommand) { def shopId = id.toString.head.toString def posId = id.toString } val getPOSId: ShardRegion.ExtractEntityId = { case posCommand: POSMessage => (posCommand.posId, posCommand.cmd) } val getShopId: ShardRegion.ExtractShardId = { case posCommand: POSMessage => posCommand.shopId } val system = ActorSystem("cloud-pos-server", config) val role = "poswriter"//role of this shard val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes) ClusterSharding(system).start( typeName = shardName, entityProps = writerProps, settings = cpsSettings, extractEntityId = getPOSId, extractShardId = getShopId, allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings), handOffStopMessage = PassivatePOS ) system.actorOf(ClusterMonitor.props, "cps-cluster-monitor") } }
以上有幾個引數需要特別注意:host和port是從main的引數解析出來的如192.168.11.162:2551,代表本節點的host和port。akka.cluster.roles代表本節點支援的角色,這裡poswriter是其中之一。而ClusterShardingSettings(system).withRole("poswriter")代表這個分片shard只能在支援poswriter角色的節點上部署。如果搞錯了執行時你會發現Sharding無法啟動。上面這段程式代表本節點支援poswriter角色。在本節點(輸入的IP地址)部署了一個名稱為“POSShard”的cluster-sharding,它具備poswriter角色。
如果我在多部機器上執行這段程式碼,輸入當前機器的IP+PORT就代表在這麼多臺機器上都部署了“POSShard”分片。上面的ClusterMonitor是個叢集狀態監控actor:
package sdp.cluster.monitor import akka.actor._ import akka.cluster.ClusterEvent._ import akka.cluster._ import sdp.logging.LogSupport object ClusterMonitor { def props = Props(new ClusterMonitor) } class ClusterMonitor extends Actor with LogSupport { val cluster = Cluster(context.system) override def preStart(): Unit = { cluster.subscribe(self,initialStateMode = InitialStateAsEvents ,classOf[MemberEvent],classOf[UnreachableMember])//訂閱叢集狀態轉換資訊 super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self)//取消訂閱 super.postStop() } override def receive: Receive = { case MemberJoined(member) => log.info(s"Member is Joining: {${member.address}}") case MemberUp(member) => log.info(s"Member is Up: {${member.address}}") case MemberLeft(member) => log.info(s"Member is Leaving: {${member.address}}") case MemberExited(member) => log.info(s"Member is Exiting: {${member.address}}") case MemberRemoved(member, previousStatus) => log.info( s"Member is Removed: {${member.address}} after {${previousStatus}") case UnreachableMember(member) => log.info(s"Member detected as unreachable: {${member.address}}") cluster.down(member.address)//手工驅除,不用auto-down case _: MemberEvent => // ignore } }
有了它我們可以監視叢集節點連線狀態。
好了,現在假設我們在幾臺機器組成的叢集各節點上都部署了“POSShard”分片,那麼就設計個客戶端來向這個“POSShard”分片傳送POSMessage:
case class POSMessage(id: Long, cmd: POSCommand) { def shopId = id.toString.head.toString def posId = id.toString } val getPOSId: ShardRegion.ExtractEntityId = { case posCommand: POSMessage => (posCommand.posId, posCommand.cmd) } val getShopId: ShardRegion.ExtractShardId = { case posCommand: POSMessage => posCommand.shopId }
這個客戶端必須考慮以下幾點:它必須在同一個叢集,也就是它也是叢集其中一個節點,否則無法和其它部署了“POSShard”分片的節點進行資訊交流。但它又不能同處與部署了“POSShard”的節點,因為remote的hostname和port已經被佔用。所以只能把客戶端放在一個沒有部署“POSShard”的節點上,然後用ClusterSharding(system).startProxy來啟動一個分片中介:
//no shard deployed on this node2558, use proxy val posHandler = ClusterSharding(system).startProxy( typeName = shardName, role = Some("poswriter"), extractEntityId = getPOSId, extractShardId = getShopId ) //val posHandler = ClusterSharding(system).shardRegion(shardName) system.actorOf(POSClient.props(posHandler), "pos-client")
注意這個proxy的role必須是Some("poswriter"),只有這樣才能呼叫其它節點上的”POSShard“,因為它們的角色都是“poswriter”。與WriterActor互動的必須是個actor,因為WriterActor會用sender()返回結果,這個sender()是個ActorRef:
object POSClient { def props(pos: ActorRef) = Props(new POSClient(pos)) } class POSClient(posHandler: ActorRef)extends Actor with LogSupport { override def receive: Receive = { case msg @ POSMessage(_,_) => posHandler ! msg case resp: POSResponse=> log.info(s"response from server: $resp") } }
我們可用下面的方式來指揮WriterActor:
val posref = system.actorOf(POSClient.props(posHandler), "pos-client") posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0)) posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0)) posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0)) posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0)) posref ! POSMessage(4021,Subtotal)
下面是服務端分片部署原始碼:
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551"] log-info = off sharding { role = "poswriter" passivate-idle-entity-after = 5 m } } persistence { journal.plugin = "cassandra-journal" snapshot-store.plugin = "cassandra-snapshot-store" } } cassandra-journal { contact-points = ["192.168.11.162"] } cassandra-snapshot-store { contact-points = ["192.168.11.162"] }
POSRouter.scala
package cloud.pos.server import akka.actor._ import akka.cluster.sharding._ import akka.cluster.sharding.ClusterSharding import com.typesafe.config.ConfigFactory import sdp.cluster.monitor._ import sdp.logging._ object POSRouter extends LogSupport { def main(args: Array[String]) { import WriterActor._ import Commands._ val argsPat = "(.*):(.*)".r val (host, port) = args(0) match { case argsPat(h, p) => (h, p) case _ => ("localhost", "2551") } val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"") .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\"")) //roles can be deployed on this node .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]")) .withFallback(ConfigFactory.load()) log.info(s"******* hostname = $host,port = $port *******") val shardName = "POSShard" case class POSMessage(id: Long, cmd: POSCommand) { def shopId = id.toString.head.toString def posId = id.toString } val getPOSId: ShardRegion.ExtractEntityId = { case posCommand: POSMessage => (posCommand.posId, posCommand.cmd) } val getShopId: ShardRegion.ExtractShardId = { case posCommand: POSMessage => posCommand.shopId } val system = ActorSystem("cloud-pos-server", config) val role = "poswriter"//role of this shard val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes) ClusterSharding(system).start( typeName = shardName, entityProps = writerProps, settings = cpsSettings, extractEntityId = getPOSId, extractShardId = getShopId, allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings), handOffStopMessage = PassivatePOS ) system.actorOf(ClusterMonitor.props, "cps-cluster-monitor") } }
下面是這個測試專案的原始碼:
build.sbt
name := "cloud-pos-client" version := "0.1" scalaVersion := "2.12.8" libraryDependencies := Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19", "com.typesafe.akka" %% "akka-persistence" % "2.5.19", "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93", "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test, "ch.qos.logback"%"logback-classic"% "1.2.3" )
resources/application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "192.168.11.162" port = 2558 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551"] log-info = off } }
resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <Pattern> %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern> </encoder> </appender> <root level="debug"> <appender-ref ref="STDOUT" /> </root> </configuration>
ClientDemo.scala
package cloud.pos.client import akka.actor._ import akka.cluster.sharding.ClusterSharding import sdp.cluster.monitor._ import sdp.logging._ import Commands._ import States._ import Items._ import akka.cluster.sharding._ object POSClientDemo extends LogSupport { def main(args: Array[String]) { val system = ActorSystem("cloud-pos-server") val shardName = "POSShard" val getPOSId: ShardRegion.ExtractEntityId = { case posCommand: POSMessage => (posCommand.posId, posCommand.cmd) } val getShopId: ShardRegion.ExtractShardId = { case posCommand: POSMessage => posCommand.shopId } //no shard deployed on this node2558, use proxy val posHandler = ClusterSharding(system).startProxy( typeName = shardName, role = Some("poswriter"), extractEntityId = getPOSId, extractShardId = getShopId ) //val posHandler = ClusterSharding(system).shardRegion(shardName) system.actorOf(ClusterMonitor.props, "cps-cluster-monitor") val posref = system.actorOf(POSClient.props(posHandler), "pos-client") posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0)) posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0)) posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0)) posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0)) posref ! POSMessage(4021,Subtotal) scala.io.StdIn.readLine() system.terminate() } }
client/Commands.scala
package cloud.pos.client object Commands { sealed trait POSCommand {} case class LogOn(opr: String, passwd: String) extends POSCommand case object LogOff extends POSCommand case class SuperOn(su: String, passwd: String) extends POSCommand case object SuperOff extends POSCommand case class MemberOn(cardnum: String, passwd: String) extends POSCommand case object MemberOff extends POSCommand//remove member status for the voucher case object RefundOn extends POSCommand case object RefundOff extends POSCommand case object VoidOn extends POSCommand case object VoidOff extends POSCommand case object VoidAll extends POSCommand case object Suspend extends POSCommand case class VoucherNum(vnum: Int) extends POSCommand case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand case object Subtotal extends POSCommand case class Discount(code: String, percent: Int) extends POSCommand case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand//settlement結算支付 //read only command, no event process case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand case class AliPay(acct: String, num: String, amount: Int) extends POSCommand case class WxPay(acct: String, num: String, amount: Int) extends POSCommand // read only command, no update event case class Plu(itemCode: String) extends POSCommand//read only case class POSMessage(id: Long, cmd: POSCommand) { def shopId = id.toString.head.toString def posId = id.toString } }
client/States.scala
package cloud.pos.client object States { object TXNTYPE { val sales: Int = 0 val refund: Int = 1 val void: Int = 2 val voided: Int = 3 val voidall: Int = 4 val subtotal: Int = 5 val logon: Int = 6 val supon: Int = 7// super user on/off val suspend: Int = 8 } object SALESTYPE { val plu: Int = 0 val dpt: Int = 1 val cat: Int = 2 val brd: Int = 3 val ra:Int = 4 val sub: Int = 5 val ttl: Int = 6 val dsc: Int = 7 val crd: Int = 8 } case class TxnItem( txndate: String = "" ,txntime: String = "" ,opr: String = ""//工號 ,num: Int = 0 //銷售單號 ,seq: Int = 1 //交易序號 ,txntype: Int = TXNTYPE.sales//交易型別 ,salestype: Int = SALESTYPE.plu //銷售型別 ,qty: Int =1 //交易數量 ,price: Int = 0 //單價(分) ,amount: Int = 0 //碼洋(分) ,dscamt: Int = 0 //折扣:負值net實洋 = amount + dscamt ,member: String = "" //會員卡號 ,code: String = "" //編號(商品、賬號...) ,desc: String = "" //專案名稱 ,dpt: String = "" ,department: String = "" ,cat: String = "" ,category: String = "" ,brd: String = "" ,brand: String = "" ) case class VchStatus( //操作狀態鎖留給前端維護 qty: Int = 1, refund: Boolean = false, void: Boolean = false) case class VchStates( opr: String = "",//收款員 jseq: BigInt = 0,//begin journal sequence for read-side replay num: Int = 0,//當前單號 seq: Int = 1,//當前序號 void: Boolean = false, //取消模式 refd: Boolean = false, //退款模式 due: Boolean = true,//當前餘額 su: String = "", mbr: String = "" ) }
client/POSClient.scala
package cloud.pos.client import akka.actor._ import sdp.logging._ import Responses._ import Commands._ object POSClient { def props(pos: ActorRef) = Props(new POSClient(pos)) } class POSClient(posHandler: ActorRef)extends Actor with LogSupport { override def receive: Receive = { case msg @ POSMessage(_,_) => posHandler ! msg case resp: POSResponse=> log.info(s"response from server: $resp") } }
client/Responses.scala
package cloud.pos.client import States._ object Responses { object STATUS { val OK: Int = 0 val FAIL: Int = -1 } case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem]) }
client/DataAccess.scala
package cloud.pos.client import java.time.LocalDate import java.time.format.DateTimeFormatter case class Item( brd: String ,dpt: String ,cat: String ,code: String ,name: String ,price: Int ) object Items { val apple = Item("01","02","01","001", "green apple", 820) val grape = Item("01","02","01","002", "red grape", 1050) val orage = Item("01","02","01","003", "sunkist orage", 350) val banana = Item("01","02","01","004", "demon banana", 300) val pineapple = Item("01","02","01","005", "hainan pineapple", 1300) val peach = Item("01","02","01","006", "xinjiang peach", 2390) val tblItems = List(apple, grape, orage, banana, pineapple, peach) sealed trait QueryItemsResult {} case class QueryItemsOK(items: List[Item]) extends QueryItemsResult case class QueryItemsFail(msg: String) extends QueryItemsResult } object Codes { case class User(code: String, name: String, passwd: String) case class Department(code: String, name: String) case class Category(code: String, name: String) case class Brand(code: String, name: String) case class Ra(code: String, name: String) case class Account(code: String, name: String) case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean) val ras = List(Ra("01","Delivery"),Ra("02","Cooking")) val dpts = List(Department("01","Fruit"),Department("02","Grocery")) val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery")) val brds = List(Brand("01","Sunkist"),Brand("02","Demon")) val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa") ,Account("004","Alipay"),Account("005","WXPay")) val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123")) def getDpt(code: String) = dpts.find(d => d.code == code) def getCat(code: String) = cats.find(d => d.code == code) def getBrd(code: String) = brds.find(b => b.code == code) def getAcct(code: String) = accts.find(a => a.code == code) def getRa(code: String) = ras.find(a => a.code == code) } object DAO { import Items._ import Codes._ def getItem(code: String): QueryItemsResult = { val optItem = tblItems.find(it => it.code == code) optItem match { case Some(item) => QueryItemsOK(List(item)) case None => QueryItemsFail("Invalid item code!") } } def validateDpt(code: String) = dpts.find(d => d.code == code) def validateCat(code: String) = cats.find(d => d.code == code) def validateBrd(code: String) = brds.find(b => b.code == code) def validateRa(code: String) = ras.find(ac => ac.code == code) def validateAcct(code: String) = accts.find(ac => ac.code == code) def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd)) def lastSecOfDateStr(ldate: LocalDate): String = { ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59" } }
logging/Log.scala
package sdp.logging import org.slf4j.Logger /** * Logger which just wraps org.slf4j.Logger internally. * * @param logger logger */ class Log(logger: Logger) { // use var consciously to enable squeezing later var isDebugEnabled: Boolean = logger.isDebugEnabled var isInfoEnabled: Boolean = logger.isInfoEnabled var isWarnEnabled: Boolean = logger.isWarnEnabled var isErrorEnabled: Boolean = logger.isErrorEnabled def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = { level match { case 'debug | 'DEBUG => debug(msg) case 'info | 'INFO => info(msg) case 'warn | 'WARN => warn(msg) case 'error | 'ERROR => error(msg) case _ => // nothing to do } } def debug(msg: => String): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg) } } def debug(msg: => String, e: Throwable): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg, e) } } def info(msg: => String): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg) } } def info(msg: => String, e: Throwable): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg, e) } } def warn(msg: => String): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg) } } def warn(msg: => String, e: Throwable): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg, e) } } def error(msg: => String): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg) } } def error(msg: => String, e: Throwable): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg, e) } } }
logging/LogSupport.scala
package sdp.logging import org.slf4j.LoggerFactory trait LogSupport { /** * Logger */ protected val log = new Log(LoggerFactory.getLogger(this.getClass)) }
logging/ClusterMonitor.scala
package sdp.cluster.monitor import akka.actor._ import akka.cluster.ClusterEvent._ import akka.cluster._ import sdp.logging.LogSupport object ClusterMonitor { def props = Props(new ClusterMonitor()) } class ClusterMonitor extends Actor with LogSupport { val cluster = Cluster(context.system) override def preStart(): Unit = { cluster.subscribe(self,initialStateMode = InitialStateAsEvents ,classOf[MemberEvent],classOf[UnreachableMember])//訂閱叢集狀態轉換資訊 super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self)//取消訂閱 super.postStop() } override def receive: Receive = { case MemberJoined(member) => log.info(s"Member is Joining: {${member.address}}") case MemberUp(member) => log.info(s"Member is Up: {${member.address}}") case MemberLeft(member) => log.info(s"Member is Leaving: {${member.address}}") case MemberExited(member) => log.info(s"Member is Exiting: {${member.address}}") case MemberRemoved(member, previousStatus) => log.info( s"Member is Removed: {${member.address}} after {${previousStatus}") case UnreachableMember(member) => log.info(s"Member detected as unreachable: {${member.address}}") cluster.down(member.address)//手工驅除,不用auto-down case _: MemberEvent => // ignore } }