上篇介紹了kafka at-least-once消費模式。kafka消費模式以commit-offset的時間節點代表不同的消費模式,分別是:at-least-once, at-most-once, exactly-once。上篇介紹的at-least-once消費模式是通過kafka自身的auto-commit實現的。事後想了想,這個應該算是at-most-once模式,因為消費過程不會影響auto-commit,kafka在每個設定的間隔都會自動進行offset-commit。如果這個間隔夠短,比整個消費過程短,那麼在完成消費過程前就已經儲存了offset,所以是at-most-once模式。不過,如果確定這個間隔一定大於消費過程,那麼又變成了at-least-once模式。具體能實現什麼消費模式並不能明確,因為auto-commit是無法從外部進行控制的。看來實現正真意義上的at-least-once消費模式還必須取得offset-commit的控制權才行。

alpakka-kafka提供了一種CommittableSource:

  1. def committableSource[K, V](settings: ConsumerSettings[K, V],
  2. subscription: Subscription): Source[CommittableMessage[K, V], Control] {...}

從這個CommittableSource輸出的元素是CommittableMessage[K,V]:

  1. final case class CommittableMessage[K, V](
  2. record: ConsumerRecord[K, V],
  3. committableOffset: CommittableOffset
  4. )

這個CommittableMessage除原始訊息之外還提供了CommittableOffset。通過Flow或Sink都可以進行offset-commit。alpakka-kafka提供了Committer,通過Committer.sink, Committer.Flow幫助實現offset-commit,Committer.flow如下:

  1. Consumer
  2. .committableSource(consumerSettings, Subscriptions.topics(topic))
  3. .mapAsync(1) { msg =>
  4. updateStock.map(_ => msg.committableOffset)
  5. }
  6. .via(Committer.flow(committerDefaults.withMaxBatch(1)))
  7. .to(Sink.seq)
  8. .run()

或Committer.sink:

  1. Consumer
  2. .committableSource(consumerSettings, Subscriptions.topics(topic))
  3. .mapAsync(1) { msg =>
  4. updateStock.map(_ => msg.committableOffset)
  5. }
  6. .toMat(Committer.sink(committerSettings))(Keep.left)
  7. .run()

下面是一個具體的at-least-once示範:

  1. val committerSettings = CommitterSettings(sys).withMaxBatch(commitMaxBatch)
  2.  
  3. val stkTxns = new DocToStkTxns(trace)
  4. val curStk = new CurStk(trace)
  5. val pcmTxns = new PcmTxns(trace)
  6.  
  7. val commitableSource = Consumer
  8. .committableSource(consumerSettings, subscription)
  9.  
  10. def start =
  11. (1 to numReaders).toList.map { _ =>
  12. RestartSource
  13. .onFailuresWithBackoff(restartSource) { () => commitableSource }
  14. // .viaMat(KillSwitches.single)(Keep.right)
  15. .async.mapAsync(1) { msg =>
  16. for {
  17. _ <- FastFuture.successful {
  18. log.step(s"AtLeastOnceReaderGroup-msg: ${msg.record}")(Messages.MachineId("", ""))
  19. }
  20. _ <- stkTxns.docToStkTxns(msg.record.value())
  21. pmsg <- FastFuture.successful {
  22. log.step(s"AtLeastOnceReaderGroup-docToStkTxns: ${msg.record}")(Messages.MachineId("", ""))
  23. msg
  24. }
  25. } yield pmsg
  26. }
  27. .async.mapAsync(1) { msg =>
  28. for {
  29. curstks <- curStk.updateStk(msg.record.value())
  30. pmsg <- FastFuture.successful {
  31. log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
  32. msg
  33. }
  34. } yield pmsg
  35. }
  36. .async.mapAsync(1) { msg =>
  37. for {
  38. pcm <- pcmTxns.writePcmTxn(msg.record.value())
  39. pmsg <- FastFuture.successful {
  40. log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
  41. msg
  42. }
  43. } yield pmsg
  44. }
  45. .async.mapAsync(1) { msg =>
  46. for {
  47. _ <- pcmTxns.updatePcm(msg.record.value())
  48. } yield "Completed"
  49. FastFuture.successful(msg.committableOffset)
  50. }
  51. .toMat(Committer.sink(committerSettings))(Keep.left)
  52. .run()
  53. }

消費過程其它部分的設計考慮和實現,如多執行緒、異常處理等可參考上篇討論。

對於at-most-once消費模式的實現,alpakka-kafka提供了atMostOnceSource:

  1. def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
  2. subscription: Subscription): Source[ConsumerRecord[K, V], Control] = {...}

下面是用這個Source實現at-most-once的示範:

  1. val atmostonceSource = Consumer
  2. .atMostOnceSource(consumerSettings, subscription)
  3.  
  4. def start =
  5. (1 to numReaders).toList.map { _ =>
  6. RestartSource
  7. .onFailuresWithBackoff(restartSource) { () => atmostonceSource }
  8. // .viaMat(KillSwitches.single)(Keep.right)
  9. .async.mapAsync(1) { msg =>
  10. for {
  11. _ <- FastFuture.successful {
  12. log.step(s"AtMostOnceReaderGroup-msg: $msg")(Messages.MachineId("", ""))
  13. }
  14. _ <- stkTxns.docToStkTxns(msg.value())
  15. pmsg <- FastFuture.successful {
  16. log.step(s"AtMostOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", ""))
  17. msg
  18. }
  19. } yield pmsg
  20. }
  21. .async.mapAsync(1) { msg =>
  22. for {
  23. _ <- FastFuture.successful {
  24. log.step(s"AtMostOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", ""))
  25. }
  26. curstks <- curStk.updateStk(msg.value())
  27. pmsg<- FastFuture.successful {
  28. log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
  29. msg
  30. }
  31. } yield pmsg
  32. }
  33. .async.mapAsync(1) { msg =>
  34. for {
  35. _ <- FastFuture.successful {
  36. log.step(s"AtMostOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", ""))
  37. }
  38. pcm <- pcmTxns.writePcmTxn(msg.value())
  39. pmsg <- FastFuture.successful {
  40. log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
  41. msg
  42. }
  43. } yield pmsg
  44. }
  45. .async.mapAsync(1) { msg =>
  46. for {
  47. _ <- FastFuture.successful {
  48. log.step(s"AtMostOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", ""))
  49. }
  50. _ <- pcmTxns.updatePcm(msg.value())
  51. _ <- FastFuture.successful {
  52. log.step(s"AtMostOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", ""))
  53. }
  54. } yield "Completed"
  55. }
  56. .toMat(Sink.seq)(Keep.left)
  57. .run()
  58. }

由於offset-commit和訊息消費是兩個獨立的過程,無論如何努力都無法保證只讀一次,必須把這兩個過程合併成一個才有可能實現。所以,exactly-once可以通過資料庫系統的事務處理transaction-processing來實現,就是把offset-commit和資料更新兩個動作放到同一個事務transaction裡,通過事務處理的ACID原子特性保證兩個動作同進同退的一致性。這也意味著這個exactly-once消費模式必須在一個提供事務處理功能的資料庫系統裡實現,也代表kafka-offset必須和其它交易資料一起存放在同一種資料庫裡。mongodb4.0以上支援事務處理,可以用來作示範。

首先,先研究一下exactly-once模式的框架:

  1. val mergedSource = Consumer
  2. .plainPartitionedManualOffsetSource(consumerSettings,subscription,
  3. loadOffsets)
  4. .flatMapMerge(maxReaders, _._2)
  5. .async.mapAsync(1) { msg =>
  6. for {
  7. cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
  8. pmsg <- FastFuture.successful {
  9. log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
  10. msg
  11. }
  12. } yield pmsg
  13. }
  14. .mapAsync(1) { msg =>
  15. for {
  16. curstks <- curStk.updateStk(msg.value())
  17. pmsg <- FastFuture.successful {
  18. log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
  19. msg
  20. }
  21. } yield pmsg
  22. }
  23. .toMat(Sink.seq)(Keep.left)
  24. .run()
  25. }
  26. }

在上面的例子裡使用了plainPartitionedManualOffsetSource:

  1. def plainPartitionedManualOffsetSource[K, V](
  2. settings: ConsumerSettings[K, V],
  3. subscription: AutoSubscription,
  4. getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]],
  5. onRevoke: Set[TopicPartition] => Unit = _ => ()
  6. ): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control] = {...}

getOffsetsOnAssign提供指定partition的offset(從資料庫裡讀出指定partition的offset值),如下:

  1. private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
  2. offsetStore.getOffsets(partitions)
  3. }
  4.  
  5. def getOffsets(partitions: Set[TopicPartition])(
  6. implicit ec: ExecutionContext) = {
  7. log.step(s"OffsetStore-getOffsets: ($partitions)")(Messages.MachineId("", ""))
  8.  
  9. def getOffset(tp: TopicPartition) = {
  10. val query = and(equal(KfkModels.SCHEMA.TOPIC, tp.topic()),
  11. equal(KfkModels.SCHEMA.PARTITION,tp.partition()))
  12. def offset: Future[Seq[Document]] = colOffset.find(query).toFuture()
  13. for {
  14. docs <- offset
  15. ofs <- FastFuture.successful(if(docs.isEmpty) None
  16. else Some(Offsets.fromDocument(docs.head)))
  17. } yield ofs
  18. }
  19. val listFut = partitions.toList.map(getOffset)
  20. val futList: Future[List[Option[KfkModels.Offsets]]] = FastFuture.sequence(listFut)
  21. futList.map { oofs =>
  22. oofs.foldRight(Map[TopicPartition,Long]()){(oof,m) =>
  23. oof match {
  24. case None => m
  25. case ofs => m + (new TopicPartition(ofs.get.topic,ofs.get.partition) -> ofs.get.offset)
  26. }
  27. }
  28. }
  29. }

注意loadOffset的函式型別:  Set[TopicPartition] => Future[Map[TopicPartition, Long]],返回的是個Map[partition,offset]。

另外,plainPartitionedManualSource返回Source[...Source[ConsumerRecord[K, V]],要用flatMapMerge打平:

  1. /**
  2. * Transform each input element into a `Source` of output elements that is
  3. * then flattened into the output stream by merging, where at most `breadth`
  4. * substreams are being consumed at any given time.
  5. *
  6. * '''Emits when''' a currently consumed substream has an element available
  7. *
  8. * '''Backpressures when''' downstream backpressures
  9. *
  10. * '''Completes when''' upstream completes and all consumed substreams complete
  11. *
  12. * '''Cancels when''' downstream cancels
  13. */
  14. def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
  15. map(f).via(new FlattenMerge[T, M](breadth))

引數breadth代表需合併的source數量。

還有,saveOffset和writeStkTxns在同一個事務處理裡:

  1. def docToStkTxns(jsonDoc: String, partition: Int, offset: Long, observable: SingleObservable[ClientSession]) = {
  2. val bizDoc = fromJson[BizDoc](jsonDoc)
  3. log.step(s"TxnalDocToStkTxns-docToStkTxns: $bizDoc")(Messages.MachineId("", ""))
  4.  
  5. observable.map(clientSession => {
  6. val transactionOptions = TransactionOptions.builder()
  7. .readPreference(ReadPreference.primary())
  8. .readConcern(ReadConcern.SNAPSHOT)
  9. .writeConcern(WriteConcern.MAJORITY)
  10. .build()
  11. clientSession.startTransaction(transactionOptions)
  12. val txns = StkTxns.docToTxns(dbStkTxn,dbVtx,dbVendor,bizDoc,trace)
  13. StkTxns.writeStkTxns(clientSession,colStkTxn,colPcm,txns,trace)
  14. offsetStore.saveOffset(clientSession,partition,offset)
  15. clientSession.commitTransaction()
  16. clientSession
  17. })
  18.  
  19. }

注意:mongodb的事務處理必須在複製集replica-set上進行。這也很容易理解,在複製集上才方便交易回滾rollback。

完整的exactly-once實現程式碼如下:

  1. private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
  2. offsetStore.getOffsets(partitions)
  3. }
  4.  
  5. val mergedSource = Consumer
  6. .plainPartitionedManualOffsetSource(consumerSettings,subscription,
  7. loadOffsets)
  8. .flatMapMerge(maxReaders, _._2)
  9.  
  10. def start = {
  11. (1 to numReaders).toList.map {_ =>
  12. RestartSource
  13. .onFailuresWithBackoff(restartSource) { () => mergedSource }
  14. // .viaMat(KillSwitches.single)(Keep.right)
  15. .async.mapAsync(1) { msg =>
  16. for {
  17. cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
  18. pmsg <- FastFuture.successful {
  19. log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
  20. msg
  21. }
  22. } yield pmsg
  23. }
  24. .async.mapAsync(1) { msg =>
  25. for {
  26. curstks <- curStk.updateStk(msg.value())
  27. pmsg <- FastFuture.successful {
  28. log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
  29. msg
  30. }
  31. } yield pmsg
  32. }
  33. .async.mapAsync(1) { msg =>
  34. for {
  35. pcm <- pcmTxns.writePcmTxn(msg.value())
  36. pmsg <- FastFuture.successful {
  37. log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
  38. msg
  39. }
  40. } yield pmsg
  41. }
  42. .async.mapAsync(1) { msg =>
  43. for {
  44. _ <- pcmTxns.updatePcm(msg.value())
  45. } yield "Completed"
  46. }
  47. .toMat(Sink.seq)(Keep.left)
  48. .run()
  49. }
  50. }

只有第一個非同步階段使用了事務處理。也就是說保證了writeStkTxns只執行一次。這個函式的功能主要是把前端產生的交易全部固化。為了避免消費過程中出現異常中斷造成了前端交易的遺失或者重複入賬,必須保證前端交易只固化一次。其它階段的資料處理都是基於已正確固化的交易記錄的。如果出現問題,可以通過重算交易記錄獲取正確的狀態。為了保證平臺執行效率,選擇了不使用事務處理的方式更新資料。