1. 程式人生 > >Spring整合Kafka中的事務

Spring整合Kafka中的事務

       原文連結:https://docs.spring.io/spring-kafka/reference/htmlsingle/#transactions

事務Transactions

       Kafka0.11.0.0版本客戶端提供了事務支援。Spring for Apache Kafka通過如下幾種方式提供事務支援:

  • KafkaTransactionManager-和普通的Spring事務支援一起使用(@Transactional,TransactionTemplate等等)。
  • 事務性的KafkaMessageListenerContainer
  • 通過KafkaTemplate實現本地事務。

       通過給DefaultKafkaProducerFactory提供一個事務id字首transactionIdPrefix開啟事務。開啟事務後,生產者工廠快取一些事務性的生產者(transactional producers),而不是管理一個共享的生產者Producer。當用戶使用close()方法關閉一個生產者時,它並沒有真正被關閉,而是被放回快取中複用。每個生產者的transactional.id

屬性就是transactionIdPrefix+n,n從0開始,每個生產者自增。

Kafka事務管理器KafkaTransactionManager

       KafkaTransactionManager是Spring框架中的平臺事務管理器PlatformTransactionManager介面的實現,在KafkaTransactionManager構造器中需要提供一個生產者工廠引用。如果你提供一個自定義的生產者工廠,它必須支援事務,參考ProducerFactory.transactionCapable()
你可以和Spring事務支援(@Transactional,TransactionTemplate等等)一起使用KafkaTransactionManager

。如果一個事務開啟了,任何事務內的KafkaTemplate操作都將使用這個事務內的生產者Producer。事務管理器將根據成功或失敗來決定提交還是回滾事務。注意KafkaTemplate必須和事務管理器使用同樣的生產者工廠ProducerFactory

事務性的監聽器容器Transactional Listener Container

       你可以給監聽器容器(listener container)提供一個KafkaTransactionManager例項,當這麼配置的時候,容器在呼叫監聽器之前會開啟事務。如果監聽器成功處理一條記錄(或者一批記錄,當使用BatchMessageListener的時候),容器將在事務管理器提交事務前,使用producer.sendOffsetsToTransaction())給事務傳送偏移量(offset(s))。如果監聽器丟擲異常,事務會回滾,下次拉取(poll)的時候消費者仍可以消費到之前出錯的記錄。

事務同步Transaction Synchronization

       如果你需要用其他事務來同步Kafka事務,只需要簡單地給監聽器容器(listener container)配置合適的事務管理器(一個支援同步的事務管理器,比如DataSourceTransactionManager)。