1. 程式人生 > >kafka系列九、kafka事務原理及使用場景

kafka系列九、kafka事務原理及使用場景

一、事務場景

1.最簡單的需求是producer發的多條訊息組成一個事務這些訊息需要對consumer同時可見或者同時不可見 。
2.producer可能會給多個topic,多個partition發訊息,這些訊息也需要能放在一個事務裡面,這就形成了一個典型的分散式事務。
3.kafka的應用場景經常是應用先消費一個topic,然後做處理再發到另一個topic,這個consume-transform-produce過程需要放到一個事務裡面,比如在訊息處理或者傳送的過程中如果失敗了,消費位點也不能提交。
4.producer或者producer所在的應用可能會掛掉,新的producer啟動以後需要知道怎麼處理之前未完成的事務 。
5.流式處理的拓撲可能會比較深,如果下游只有等上游訊息事務提交以後才能讀到,可能會導致rt非常長吞吐量也隨之下降很多,所以需要實現read committed和read uncommitted兩種事務隔離級別。

二、幾個關鍵概念和推導

1.因為producer傳送訊息可能是分散式事務,所以引入了常用的2PC,所以有事務協調者(Transaction Coordinator)。Transaction Coordinator和之前為了解決腦裂和驚群問題引入的Group Coordinator在選舉和failover上面類似。

2.事務管理中事務日誌是必不可少的,kafka使用一個內部topic來儲存事務日誌,這個設計和之前使用內部topic儲存位點的設計保持一致。事務日誌是Transaction Coordinator管理的狀態的持久化,因為不需要回溯事務的歷史狀態,所以事務日誌只用儲存最近的事務狀態。
3.因為事務存在commit和abort兩種操作,而客戶端又有read committed和read uncommitted兩種隔離級別,所以訊息佇列必須能標識事務狀態,這個被稱作Control Message。
4.producer掛掉重啟或者漂移到其它機器需要能關聯的之前的未完成事務所以需要有一個唯一識別符號來進行關聯,這個就是TransactionalId,一個producer掛了,另一個有相同TransactionalId的producer能夠接著處理這個事務未完成的狀態。注意不要把TransactionalId和資料庫事務中常見的transaction id搞混了,kafka目前沒有引入全域性序,所以也沒有transaction id,這個TransactionalId是使用者提前配置的。
5. TransactionalId能關聯producer,也需要避免兩個使用相同TransactionalId的producer同時存在,所以引入了producer epoch來保證對應一個TransactionalId只有一個活躍的producer epoch

三、事務語義

2.1.  多分割槽原子寫入

事務能夠保證Kafka topic下每個分割槽的原子寫入。事務中所有的訊息都將被成功寫入或者丟棄。例如,處理過程中發生了異常並導致事務終止,這種情況下,事務中的訊息都不會被Consumer讀取。現在我們來看下Kafka是如何實現原子的“讀取-處理-寫入”過程的。

首先,我們來考慮一下原子“讀取-處理-寫入”週期是什麼意思。簡而言之,這意味著如果某個應用程式在某個topic tp0的偏移量X處讀取到了訊息A,並且在對訊息A進行了一些處理(如B = F(A))之後將訊息B寫入topic tp1,則只有當訊息A和B被認為被成功地消費並一起釋出,或者完全不釋出時,整個讀取過程寫入操作是原子的。

現在,只有當訊息A的偏移量X被標記為消耗時,訊息A才被認為是從topic tp0消耗的,消費到的資料偏移量(record offset)將被標記為提交偏移量(Committing offset)。在Kafka中,我們通過寫入一個名為offsets topic的內部Kafka topic來記錄offset commit。訊息僅在其offset被提交給offsets topic時才被認為成功消費。

由於offset commit只是對Kafkatopic的另一次寫入,並且由於訊息僅在提交偏移量時被視為成功消費,所以跨多個主題和分割槽的原子寫入也啟用原子“讀取-處理-寫入”迴圈:提交偏移量X到offset topic和訊息B到tp1的寫入將是單個事務的一部分,所以整個步驟都是原子的。

2.2.  粉碎“殭屍例項”

我們通過為每個事務Producer分配一個稱為transactional.id的唯一識別符號來解決殭屍例項的問題。在程序重新啟動時能夠識別相同的Producer例項。

API要求事務性Producer的第一個操作應該是在Kafka叢集中顯示註冊transactional.id。 當註冊的時候,Kafka broker用給定的transactional.id檢查開啟的事務並且完成處理。 Kafka也增加了一個與transactional.id相關的epoch。Epoch儲存每個transactional.id內部元資料。

一旦這個epoch被觸發,任何具有相同的transactional.id和更舊的epoch的Producer被視為殭屍,並被圍起來, Kafka會拒絕來自這些Procedure的後續事務性寫入。

2.3.  讀事務訊息

現在,讓我們把注意力轉向資料讀取中的事務一致性。

Kafka Consumer只有在事務實際提交時才會將事務訊息傳遞給應用程式。也就是說,Consumer不會提交作為整個事務一部分的訊息,也不會提交屬於中止事務的訊息。

值得注意的是,上述保證不足以保證整個訊息讀取的原子性,當使用Kafka consumer來消費來自topic的訊息時,應用程式將不知道這些訊息是否被寫為事務的一部分,因此他們不知道事務何時開始或結束;此外,給定的Consumer不能保證訂閱屬於事務一部分的所有Partition,並且無法發現這一點,最終難以保證作為事務中的所有訊息被單個Consumer處理。

簡而言之:Kafka保證Consumer最終只能提供非事務性訊息或提交事務性訊息。它將保留來自未完成事務的訊息,並過濾掉已中止事務的訊息。

四 、事務處理Java API

事務功能主要是一個伺服器端和協議級功能,任何支援它的客戶端庫都可以使用它。 一個Java編寫的使用Kafka事務處理API的“讀取-處理-寫入”應用程式示例: