1. 程式人生 > >Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存寫端操作方式

Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存寫端操作方式

proto model cover 如何 RKE deletion election payload sha

上篇我們提到CQRS是一種讀寫分離式高並發、大流量數據錄入體系,其中存寫部分是通過event-sourcing+akka-persistence實現的。也可以這樣理解:event-sourcing(事件源)是一種特殊數據錄入模式,akka-persistence是這種模式的具體實現方式。事件源的核心思想是把某寫發生的事件寫入log(journal)。這些事件是能成功改變系統狀態,並且時已經發生的事情。一開始我常常把事件源和命令源(command-sourcing)混為一談。它們根本的區別事件event是已經發生的,命令command是待發生的。如果我們把命令存入journal,在對journal裏的記錄進行重新演算(replay)時就會執行命令並產生一些副作,如顯示打印、發email等等。而系統狀態和環境隨著時間在不斷變化,這些副作用也會在不同的時間產生不同的影響,這肯定是我們不想看見的。

事件源模式中,在內存裏保存能代表程序狀態的對象state-objects,這些狀態對象與數據庫表model之間建立了對應關系。假設程序中支持某些指令command,它們會改變程序的狀態並且還可能還會產生一些副作用,那麽用事件源做法的操作順序應該是:產生副作用->存寫事件->改變內存裏的狀態對象。其中任何一個環節失敗都會放棄下面的環節。另一方面,在用journal中記錄進行重新演算時,就需要先把發生的事件還原成改變狀態的命令,人為的免去副作用,因為它已經在正確的時間產生過了,然後只要更新數據庫model狀態就算完成了。所以,實現persistence包括object和model之間對應、state-objects維護方式以及command和event之間的轉換。

首先分析一下command與event之間的轉換:我們還是用上一篇的POS收銀系統做示範。下面是幾個收銀操作指令:

  case class GetItemInfo(itemcode: String) extends Command   
  case class AddItem(item: Item, qty: Int) extends Command   
  case class AliPay(amount: Double) extends Command

上面三個典型command可以下面的方式轉換成event:

GetItemInfo:這是一個查詢商品資料的指令,不影響交易狀態,不必轉換

AddItem: 這個指令只影響交易狀態,沒有副作用,轉換成 :ItemAdded(item: Item, qty: Int) extends Event

AliPay:改變交易狀態並且產生副作用,因為要即時從支付寶扣款。做法:先確定支付成功,然後轉成: AliPaid(amount Double) extends Event

  case class ItemAdded(item: Item, qty: Int) extends Event
  case class AliPaid(amount: Double) extends Event

POS收銀交易狀態是一張未結算賬單內容,是個簡單的交易記錄清單SalesMemo:

  //交易記錄
  case class TxnItem(
     num: Int  //銷售單號
    ,seq: Int  //交易序號
    ,txntype: Int //交易類型編號
    ,code: String //編號(商品、賬號...)
    ,qty: Int //交易數量
    ,price: Int //單價(分)
    ,amount: Int //金額(分)
    )
  case class SalesMemo(salesnum: Int, txnitems: List[TxnItem] = Nil) {
    def itemAdded(evt: Event): SalesMemo = evt match {
      case ItemAdded(item,qty) =>
        copy(txnitems = TxnItem(salesnum, txnitems.length+1,0,item.code,qty,item.price,qty * item.price) :: txnitems)
      case _ => this
    }

    def aliPaid(evt: Event) = evt match {
      case AliPaid(amt) =>
        copy(txnitems = TxnItem(salesnum,txnitems.length+1,0,ali,1,amt,amt) :: items)
      case _ => this
    }
  }

itemAdded,aliPaid這兩個函數分別代表AddItem和AliPay對狀態對象的轉變處理。

上面提到persistenceActor存寫journal時對事件發生的順序有嚴格要求,否則無法實現讀取端正確恢復原始狀態。這項要求的實現是通過persist/persistAsync這兩種函數來實現的。下面是這幾類函數的款式:

//無幹擾存寫,後面進來的消息先存放在內部的臨時存放點 message-stashing
  def persist[A](event: A)(handler: A ? Unit): Unit = {
    internalPersist(event)(handler)
  }

//同時存寫多個事件
  def persistAll[A](events: immutable.Seq[A])(handler: A ? Unit): Unit = {
    internalPersistAll(events)(handler)
  }

//異步存寫事件,沒有臨時存放點機制  no-message-stashing
  def persistAsync[A](event: A)(handler: A ? Unit): Unit = {
    internalPersistAsync(event)(handler)
  }

//異步存寫多項事件
  def persistAllAsync[A](events: immutable.Seq[A])(handler: A ? Unit): Unit = {
    internalPersistAllAsync(events)(handler)
  }

//不存寫事件,利用內部臨時存放點機制來保證handler執行順序
  def defer[A](event: A)(handler: A ? Unit): Unit = {
    internalDefer(event)(handler)
  }

//不存寫事件,只保證handler運行順序
  def deferAsync[A](event: A)(handler: A ? Unit): Unit = {
    internalDeferAsync(event)(handler)
  }

無論如何,handler函數都保證在事件存寫動作成功後才能運行。我們用一些偽代碼來示範有臨存stash及無臨存no-stash時handler運行的順序:

  override def receiveCommand: Receive = {
    case c: String ? {
      sender() ! c
      persist(s"evt-$c-1") { e ? sender() ! e }
      persist(s"evt-$c-2") { e ? sender() ! e }
      defer(s"evt-$c-3") { e ? sender() ! e }
    }
  }

//有內部臨存 with message stashing
persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// evt-a-1
// evt-a-2
// evt-a-3
// b
// evt-b-1
// evt-b-2
// evt-b-3

----------------------------------
  override def receiveCommand: Receive = {
    case c: String ? {
      sender() ! c
      persistAsync(s"evt-$c-1") { e ? sender() ! e }
      persistAsync(s"evt-$c-2") { e ? sender() ! e }
      deferAsync(s"evt-$c-3") { e ? sender() ! e }
    }
  }

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// b           //無臨存機制,外部信息立即處理了
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3

如果發生內嵌多層persist時,正確的順序如下:

override def receiveCommand: Receive = {
  case c: String ?
    sender() ! c

    persist(s"$c-1-outer") { outer1 ?
      sender() ! outer1
      persist(s"$c-1-inner") { inner1 ?
        sender() ! inner1
      }
    }

    persist(s"$c-2-outer") { outer2 ?
      sender() ! outer2
      persist(s"$c-2-inner") { inner2 ?
        sender() ! inner2
      }
    }
}

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// a-outer-1
// a-outer-2
// a-inner-1
// a-inner-2
// and only then process "b"
// b
// b-outer-1
// b-outer-2
// b-inner-1
// b-inner-2

--------------------------------
override def receiveCommand: Receive = {
  case c: String ?
    sender() ! c
    persistAsync(c + "-outer-1") { outer ?
      sender() ! outer
      persistAsync(c + "-inner-1") { inner ? sender() ! inner }
    }
    persistAsync(c + "-outer-2") { outer ?
      sender() ! outer
      persistAsync(c + "-inner-2") { inner ? sender() ! inner }
    }
}

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// b
// a-outer-1
// a-outer-2
// b-outer-1
// b-outer-2
// a-inner-1
// a-inner-2
// b-inner-1
// b-inner-2

// which can be seen as the following causal relationship:
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2

值得註意的是這個handler函數只會在事件存寫成功後才運行,失敗則否。也就是說確認了事件已經安全存寫後才更新state-objects狀態(model狀態在CQRS讀取時再相應更新)。針對上面的POS例子裏可以用下面的代碼處理方式:

  override def receiveCommand: Receive = {
    case AddItem(item,qty) =>
       persist(ItemAdded(item,qty))(salesMemo.itemAdded)
    case AliPay(amt) =>
      try {
        if (aliOnlinePay(amt))  //先產生副作用
          persist(AliPaid(amt))(salesMemo.alipaid(_))
      } catch {
        case _ > Throw new OnlinePayExecption("boom!!!")
      }
...

akka-persistence代表CQRS模式中以事件源方式存寫數據的具體實現。我們提到過,數據存寫具體做法是向一個journal裏寫入發生的改變狀態目標state-objects的事件。每次PersistenceActor啟動時都會從journal裏讀取之前寫入的事件、還原成指令command、然後逐步把state-objects恢復到上次停止時的狀態,不管是因異常還是正常停止的。這個恢復狀態的過程是由PersistenceActor的receiveRecovery函數實現的,如下:

  override def receiveRecover: Receive = {
    case evt: Event => 
           salesMemo = salesMemo.updateMemo(evt)
    case SnapshotOffer(_,loggedItems: SalesMemo) =>
           salesMemo = loggedItems
  }

按理來說恢復狀態即是把事件從頭到尾再演算一遍。不過這種方式效率是個大問題,試想每次啟動都需要先讀取幾十萬條數據會是怎樣的感受。效率問題的解決方法就是通過存寫快照方式把之前的事件總結成快照snapshot形式的階段狀態,然後存入快照庫(snapshot-store)。這樣在PersistenceActor啟動時先用最後一個快照把狀態恢復到一個階段,然後再讀取快照產生之後的所有事件對階段性狀態再轉換成最新狀態。快照的讀寫函數如下:

def saveSnapshot(snapshot: Any): Unit = {
    snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  }

/**
 * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
 * before any further replayed messages.
 */
@SerialVersionUID(1L)
final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)

/**
 * Snapshot metadata.
 *
 * @param persistenceId id of persistent actor from which the snapshot was taken.
 * @param sequenceNr sequence number at which the snapshot was taken.
 * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
 */
@SerialVersionUID(1L) //#snapshot-metadata
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata

PersistenceActor裏有receiveRecover和receiveCommand兩個抽象函數,必須由用戶提供具體的實現。這兩個函數代表了PersistentActor的兩大功能:狀態復原和消息處理。狀態復原是通過receiveRecover對snapshot-store和journal裏的記錄處理實現的。而PersistentActor的receiveCommand就是普通Actor的receive消息處理函數。用戶可以通過PersistentActor提供的回調(callback)函數來進行事件讀取過程前的事前準備和後面的事後處理。可以對這些callback函數進行重載(override)來自定義這些處理程序,如:

/**
   * Called whenever a message replay fails. By default it logs the error.
   *
   * Subclass may override to customize logging.
   *
   * The actor is always stopped after this method has been invoked.
   *
   * @param cause failure cause.
   * @param event the event that was processed in `receiveRecover`, if the exception
   *   was thrown there
   */
  protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =
...
 /**
   * Called when persist fails. By default it logs the error.
   * Subclass may override to customize logging and for example send negative
   * acknowledgment to sender.
   *
   * The actor is always stopped after this method has been invoked.
   *
   * Note that the event may or may not have been saved, depending on the type of
   * failure.
   *
   * @param cause failure cause.
   * @param event the event that was to be persisted
   */
  protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
...
  /**
   * Called when the journal rejected `persist` of an event. The event was not
   * stored. By default this method logs the problem as a warning, and the actor continues.
   * The callback handler that was passed to the `persist` method will not be invoked.
   *
   * @param cause failure cause
   * @param event the event that was to be persisted
   */
  protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
...

也可以通過函數重載來自定義狀態恢復行為:

trait PersistenceRecovery {
  //#persistence-recovery
  /**
   * Called when the persistent actor is started for the first time.
   * The returned [[Recovery]] object defines how the Actor will recover its persistent state before
   * handling the first incoming message.
   *
   * To skip recovery completely return `Recovery.none`.
   */
  def recovery: Recovery = Recovery()
  //#persistence-recovery
}

整個狀態恢復過程是在EventSourced.scala裏下面這個函數實現的:

override def stateReceive(receive: Receive, message: Any) = try message match {
        case ReplayedMessage(p) ?
          try {
            eventSeenInInterval = true
            updateLastSequenceNr(p)
            Eventsourced.super.aroundReceive(recoveryBehavior, p)
          } catch {
            case NonFatal(t) ?
              timeoutCancellable.cancel()
              try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
              returnRecoveryPermit()
          }
        case RecoverySuccess(highestSeqNr) ?
          timeoutCancellable.cancel()
          onReplaySuccess() // callback for subclass implementation
          sequenceNr = highestSeqNr
          setLastSequenceNr(highestSeqNr)
          _recoveryRunning = false
          try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
          finally transitToProcessingState()
        case ReplayMessagesFailure(cause) ?
          timeoutCancellable.cancel()
          try onRecoveryFailure(cause, event = None) finally context.stop(self)
        case RecoveryTick(false) if !eventSeenInInterval ?
          timeoutCancellable.cancel()
          try onRecoveryFailure(
            new RecoveryTimedOut(s"Recovery timed out, didn‘t get event within $timeout, highest sequence number seen $lastSequenceNr"),
            event = None)
          finally context.stop(self)
        case RecoveryTick(false) ?
          eventSeenInInterval = false
        case RecoveryTick(true) ?
        // snapshot tick, ignore
        case other ?
          stashInternally(other)
      } catch {
        case NonFatal(e) ?
          returnRecoveryPermit()
          throw e
      }

函數通過super.aroundReceive把消息傳給了receiveRecovery:

/**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor‘s current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
    // optimization: avoid allocation of lambda
    if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
      unhandled(msg)
    }
  }

因為EventSourced繼承了PersistenceRecovery trait,所以重載recovery函數可以改變狀態恢復行為。默認的模式是:

/**
 * Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
 *
 * By default recovers from latest snapshot replays through to the last available event (last sequenceId).
 *
 * Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
 * and at least one of these snapshots matches the specified `fromSnapshot` criteria.
 * Otherwise, recovery will start from scratch by replaying all stored events.
 *
 * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
 * message, followed by replayed messages, if any, that are younger than the snapshot, up to the
 * specified upper sequence number bound (`toSequenceNr`).
 *
 * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
 *                     is latest (= youngest) snapshot.
 * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
 * @param replayMax maximum number of messages to replay. Default is no limit.
 */
@SerialVersionUID(1L)
final case class Recovery(
  fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
  toSequenceNr: Long                      = Long.MaxValue,
  replayMax:    Long                      = Long.MaxValue)

下面是狀態恢復過程中產生的消息:

/**
 * Sent to a [[PersistentActor]] when the journal replay has been finished.
 */
@SerialVersionUID(1L)
case object RecoveryCompleted extends RecoveryCompleted {
...
final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace

狀態恢復的進程可用用下面的方法檢測:

/**
   * Returns `true` if this persistent actor is currently recovering.
   */
  def recoveryRunning: Boolean = {
    // currentState is null if this is called from constructor
    if (currentState == null) true else currentState.recoveryRunning
  }

  /**
   * Returns `true` if this persistent actor has successfully finished recovery.
   */
  def recoveryFinished: Boolean = !recoveryRunning

用戶也可以刪除journal裏的事件。雖然應該作為原始資料完整保存不應該鼓勵這麽做:

/**
   * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
   *
   * If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor.
   * If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor.
   *
   * @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
   */
  def deleteMessages(toSequenceNr: Long): Unit =
    journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)

刪除事件範圍是用SequenceNr來代表的,下面是一些可用的序號:

/**
   * Returns `persistenceId`.
   */
  override def snapshotterId: String = persistenceId

  /**
   * Highest received sequence number so far or `0L` if this actor hasn‘t replayed
   * or stored any persistent events yet.
   */
  def lastSequenceNr: Long = _lastSequenceNr

  /**
   * Returns `lastSequenceNr`.
   */
  def snapshotSequenceNr: Long = lastSequenceNr

事件刪除過程可用下面的消息監控:

/**
 * Reply message to a successful [[Eventsourced#deleteMessages]] request.
 */
final case class DeleteMessagesSuccess(toSequenceNr: Long)

/**
 * Reply message to a failed [[Eventsourced#deleteMessages]] request.
 */
final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)

下面是一些快照的持久化維護方法:

/**
   * Snapshotter id.
   */
  def snapshotterId: String

  /**
   * Sequence number to use when taking a snapshot.
   */
  def snapshotSequenceNr: Long

  /**
   * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
   * to the running [[PersistentActor]].
   */
  def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
    snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)

  /**
   * Saves a `snapshot` of this snapshotter‘s state.
   *
   * The [[PersistentActor]] will be notified about the success or failure of this
   * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message.
   */
  def saveSnapshot(snapshot: Any): Unit = {
    snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
  }

  /**
   * Deletes the snapshot identified by `sequenceNr`.
   *
   * The [[PersistentActor]] will be notified about the status of the deletion
   * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message.
   */
  def deleteSnapshot(sequenceNr: Long): Unit = {
    snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr))
  }

  /**
   * Deletes all snapshots matching `criteria`.
   *
   * The [[PersistentActor]] will be notified about the status of the deletion
   * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message.
   */
  def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
    snapshotStore ! DeleteSnapshots(snapshotterId, criteria)
  }

快照維護數據結構和消息如下:

/**
 * Snapshot metadata.
 *
 * @param persistenceId id of persistent actor from which the snapshot was taken.
 * @param sequenceNr sequence number at which the snapshot was taken.
 * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
 */
@SerialVersionUID(1L) //#snapshot-metadata
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata

object SnapshotMetadata {
  implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) ?
    if (a eq b) false
    else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0
    else if (a.sequenceNr != b.sequenceNr) a.sequenceNr < b.sequenceNr
    else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp
    else false
  }
}

/**
 * Sent to a [[PersistentActor]] after successful saving of a snapshot.
 *
 * @param metadata snapshot metadata.
 */
@SerialVersionUID(1L)
final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after successful deletion of a snapshot.
 *
 * @param metadata snapshot metadata.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots.
 *
 * @param criteria snapshot selection criteria.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after failed saving of a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param cause failure cause.
 */
@SerialVersionUID(1L)
final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after failed deletion of a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param cause failure cause.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
  extends SnapshotProtocol.Response

/**
 * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots.
 *
 * @param criteria snapshot selection criteria.
 * @param cause failure cause.
 */
@SerialVersionUID(1L)
final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)
  extends SnapshotProtocol.Response

/**
 * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
 * before any further replayed messages.
 */
@SerialVersionUID(1L)
final case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)

/**
 * Selection criteria for loading and deleting snapshots.
 *
 * @param maxSequenceNr upper bound for a selected snapshot‘s sequence number. Default is no upper bound,
 *   i.e. `Long.MaxValue`
 * @param maxTimestamp upper bound for a selected snapshot‘s timestamp. Default is no upper bound,
 *   i.e. `Long.MaxValue`
 * @param minSequenceNr lower bound for a selected snapshot‘s sequence number. Default is no lower bound,
 *   i.e. `0L`
 * @param minTimestamp lower bound for a selected snapshot‘s timestamp. Default is no lower bound,
 *   i.e. `0L`
 *
 * @see [[Recovery]]
 */
@SerialVersionUID(1L)
final case class SnapshotSelectionCriteria(
  maxSequenceNr: Long = Long.MaxValue,
  maxTimestamp:  Long = Long.MaxValue,
  minSequenceNr: Long = 0L,
  minTimestamp:  Long = 0L) {

  /**
   * INTERNAL API.
   */
  private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
    if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this

  /**
   * INTERNAL API.
   */
  private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
    metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp &&
      metadata.sequenceNr >= minSequenceNr && metadata.timestamp >= minTimestamp
}

object SnapshotSelectionCriteria {
  /**
   * The latest saved snapshot.
   */
  val Latest = SnapshotSelectionCriteria()

  /**
   * No saved snapshot matches.
   */
  val None = SnapshotSelectionCriteria(0L, 0L)

  /**
   * Java API.
   */
  def create(maxSequenceNr: Long, maxTimestamp: Long) =
    SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp)

  /**
   * Java API.
   */
  def create(maxSequenceNr: Long, maxTimestamp: Long,
             minSequenceNr: Long, minTimestamp: Long) =
    SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp, minSequenceNr, minTimestamp)

  /**
   * Java API.
   */
  def latest() = Latest

  /**
   * Java API.
   */
  def none() = None
}

/**
 * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot.
 */
final case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)

object SelectedSnapshot {
  /**
   * Java API, Plugin API.
   */
  def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
    SelectedSnapshot(metadata, snapshot)
}

/**
 * INTERNAL API.
 *
 * Defines messages exchanged between persistent actors and a snapshot store.
 */
private[persistence] object SnapshotProtocol {

  /** Marker trait shared by internal snapshot messages. */
  sealed trait Message extends Protocol.Message
  /** Internal snapshot command. */
  sealed trait Request extends Message
  /** Internal snapshot acknowledgement. */
  sealed trait Response extends Message

  /**
   * Instructs a snapshot store to load a snapshot.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting a snapshot from which recovery should start.
   * @param toSequenceNr upper sequence number bound (inclusive) for recovery.
   */
  final case class LoadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
    extends Request

  /**
   * Response message to a [[LoadSnapshot]] message.
   *
   * @param snapshot loaded snapshot, if any.
   */
  final case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
    extends Response

  /**
   * Reply message to a failed [[LoadSnapshot]] request.
   * @param cause failure cause.
   */
  final case class LoadSnapshotFailed(cause: Throwable) extends Response

  /**
   * Instructs snapshot store to save a snapshot.
   *
   * @param metadata snapshot metadata.
   * @param snapshot snapshot.
   */
  final case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
    extends Request

  /**
   * Instructs snapshot store to delete a snapshot.
   *
   * @param metadata snapshot metadata.
   */
  final case class DeleteSnapshot(metadata: SnapshotMetadata)
    extends Request

  /**
   * Instructs snapshot store to delete all snapshots that match `criteria`.
   *
   * @param persistenceId persistent actor id.
   * @param criteria criteria for selecting snapshots to be deleted.
   */
  final case class DeleteSnapshots(persistenceId: String, criteria: SnapshotSelectionCriteria)
    extends Request
}

篇幅所限,我們將在下一篇用一個具體的應用例子來進行akka-CQRS寫端示範。

Akka-CQRS(1)- Write-side, Persisting event sources:CQRS存寫端操作方式