1. 程式人生 > >Kafka設計解析(二十一)關於Kafka冪等producer的討論

Kafka設計解析(二十一)關於Kafka冪等producer的討論

不同 .cn stat ali 就是 額外 ber nec local

轉載自 huxihx,原文鏈接 關於Kafka冪等producer的討論

眾所周知,Kafka 0.11.0.0版本正式支持精確一次處理語義(exactly once semantics,下稱EOS)。Kafka的EOS主要體現在3個方面:

  • 冪等producer:保證發送單個分區的消息只會發送一次,不會出現重復消息
  • 事務(transaction):保證原子性地寫入到多個分區,即寫入到多個分區的消息要麽全部成功,要麽全部回滾
  • 流處理EOS:流處理本質上可看成是“讀取-處理-寫入”的管道。此EOS保證整個過程的操作是原子性。註意,這只適用於Kafka Streams

上面3種EOS語義有著不同的應用範圍,冪等producr只能保證單分區

上無重復消息;事務可以保證多分區寫入消息的完整性;而流處理EOS保證的是端到端(E2E)消息處理的EOS。用戶在使用過程中需要根據自己的需求選擇不同的EOS。以下是啟用方法:

  • 啟用冪等producer:在producer程序中設置屬性enable.idempotence=true,但不要設置transactional.id。註意是不要設置,而不是設置成空字符串或"null"
  • 啟用事務支持:在producer程序中設置屬性transcational.id為一個指定字符串(你可以認為這是你的事務名稱,故最好起個有意義的名字),同時設置enable.idempotence=true
  • 啟用流處理EOS:在Kafka Streams程序中設置processing.guarantee=exactly_once

本文主要討論冪等producer的設計與實現。

所謂冪等producer指producer.send的邏輯是冪等的,即發送相同的Kafka消息,broker端不會重復寫入消息。同一條消息Kafka保證底層日誌中只會持久化一次,既不會丟失也不會重復。冪等性可以極大地減輕下遊consumer系統實現消息去重的工作負擔,因此是非常實用的功能。值得註意的是,冪等producer提供的語義保證是有條件的:

  • 單分區冪等性:冪等producer無法實現多分區上的冪等性。如前所述,若要實現多分區上的原子性,需要引入事務
  • 單會話冪等性:冪等producer無法跨會話實現冪等性。即使同一個producer宕機並重啟也無法保證消息的EOS語義

雖然有上面兩個限制,冪等producer依然是一個非常實用的新功能。下面我們來討論下它的設計原理。如果要實現冪等性, 通常都需要花費額外的空間來保存狀態以執行消息去重。Kafka的冪等producer整體上也是這樣的思想。

首先,producer對象引入了一個新的字段:Producer ID(下稱PID),它唯一標識一個producer,當producer啟動時Kafka會為每個producer分配一個PID(64位整數),因此PID的生成和分配對用戶來說是完全透明的,用戶無需考慮PID的事情,甚至都感受不到PID的存在。其次,0.11 Kafka重構了消息格式(有興趣的參見Kafka 0.11消息設計),引入了序列號字段(sequence number,下稱seq number)來標識某個PID producer發送的消息。和consumer端的offset類似,seq number從0開始計數並嚴格單調增加。同時在broker端會為每個PID(即每個producer)保存該producer發送過來的消息batch的某些元信息,比如PID信息、消息batch的起始seq number及結束seq number等。這樣每當該PID發送新的消息batch時,Kafka broker就會對比這些信息,如果發生沖突(比如起始seq number和結束seq number與當前緩存的相同),那麽broker就會拒絕這次寫入請求。倘若沒有沖突,那麽broker端就會更新這部分緩存然後再開始寫入消息。這就是Kafka實現冪等producer的設計思路:1. 為每個producer設置唯一的PID;2. 引入seq number以及broker端seq number緩存更新機制來去重。

介紹了設計思想,我們來看下具體的實現,如下圖所示:

技術分享圖片

以前的博客中提到過,Java producer(區別於Scala producer)是雙線程的設計,分為KafkaProducer用戶主線程和Sender線程。前者調用send方法將消息寫入到producer的內存緩沖區中,即RecordAccumulator中,而後者會定期地從RecordAccumulator中獲取消息並將消息歸入不同的batch中發送到對應的broker上。在冪等producer中,用戶主線程的邏輯變動不大。send方法依然是將消息寫入到RecordAccumulator。而Sender線程卻有著很大的改動。我們首先來看下上圖中的第一步:發送InitProducerIdRequest請求。

InitProducerIdRequest是0.11.0.0版本新引入的請求類型,它由兩個字段組成:transactionalId和timeout,其中transactionalId就是producer端參數transactional.Id的值,timeout則是事務的超時時間。由於我們未引入事務而只是配置冪等producer,故transcationalId為null,而timeout則設置成了Int.MAX,即Sender線程將一直阻塞直到broker端發送PID返回。一旦接收到broker端返回的response,Sender線程就會更新該producer的PID字段。有興趣的讀者可以參考源碼:Sender.maybeWaitForProducerId,如下圖所示:

技術分享圖片

(下面我就不貼源碼了,但會給出對應的源碼文件,有興趣的直接看吧~~)

上圖中, 第一步是隨機尋找一個負載最低的broker,即當前未完成請求數最少的broker。由此可見,InitProducerIdRequest和MetadataRequest一樣,都可由任意的broker完成處理。至於為什麽我們稍後討論,現在先來討論下broker端是如何確定PID的。其實說起來很簡單,Kafka在Zookeeper中新引入了一個節點:/latest_producer_id_block,broker啟動時提前預分配一段PID,當前是0~999,即提前分配出1000個PID來,如下所示:

[zk: localhost:2181(CONNECTED) 2] get /latest_producer_id_block

{"version":1,"broker":0,"block_start":"0","block_end":"999"}

一旦PID超過了999,則目前會按照1000的步長重新分配,到時候就應該是這個樣子:

{"version":1, "broker":0,"block_start":"1000","block_end":"1999"}

除了上面的信息,broker在內存中還保存了下一個待分配的PID。這樣,當broker端接收到InitProducerIdRequest請求後,它會比較下一個PID是否在當前預分配的PID範圍:若是則直接返回;否則再次預分配下一批的PID。現在我們來討論下為什麽這個請求所有broker都能響應——原因就在於集群中所有broker啟動時都會啟動一個叫TransactionCoordinator的組件,該組件能夠執行預分配PID塊和分配PID的工作,而所有broker都使用/latest_producer_id_block節點來保存PID塊,因此任意一個broker都能響應這個請求。

上圖中的第二步就是發送InitProducerIdRequest的方法,註意當前是同步等待返回結果,即Sender線程會無限阻塞直到broker端返回response(當然依然會受制於request.timeout.ms參數的影響)。當拿到response後,Sender線程就會更新該producer的PID字段,如圖中第三步所示。

確定了PID之後,Sender線程會調用RecordAccumulator.drain()提取當前可發送的消息,在該方法中會將PID,Seq number等信息封裝進消息batch中,具體代碼參見:RecordAccumulator.java#drain()。一旦獲取到消息batch後,Sender線程開始構建ProduceRequest請求然後發送給broker端。至此producer端的工作就算告一段落了。

下面我們看下broker端是如何響應PRODUCE請求。實際上,broker最重要的事情就是要區別某個PID的同一個消息batch是否重復發送了。因此在消息被寫入到leader底層日誌之前必須要先做一次判斷,即PRODUCE請求中的消息batch是否已然被處理過,判斷的邏輯就在:ProducerStateManager.scala中的ProducerAppendInfo#validateAppend方法中。如果請求中包含的消息batch與最近一次成功寫入的batch相同(即PID相同,batch起始seq number和batch結束seq number都相同),那麽該方法便拋出DuplicateSequenceNumberException,然後由上層方法捕獲到該異常封裝進ProduceResponse返回。如果batch不相同,則允許此次寫入,並在寫入完成後更新這些producer信息。

值得一提的是在0.11.0.0版本中DuplicateSequenceNumberException繼承自RetriableException類,即表示Kafka認為它是一個可重試的異常。這其實是個問題,因為拋出該異常已經表明broker不需要處理這次寫入,即使重試broker依然會拒絕,因此在1.0.0版本中該類已經不再繼承自RetriableException,順便還改了個名字:DuplicateSequenceException。

以上就是關於冪等producer的一些討論。從上面的分析中我們可以看到冪等producer的設計思想主要是基於用空間保存狀態並利用狀態來去重的思想。了解了這一點,你會發現冪等producer的設計以及代碼改動實際上非常容易理解。

最後再說一點:以上所說的冪等producer一直強調的是“精確處理一次”的語義,實際上冪等producer還有“不亂序”的強語義保證,只不過在0.11版本中這種不亂序主要是通過設置enable.idempotence=true時強行將max.in.flight.requests.per.connection設置成1來實現的。這種實現雖然保證了消息不亂序,但也在某種程度上降低了producer的TPS。據我所知,這個問題將在1.0.0版本中已然得到解決。在後續的Kafka 1.0.0版本中即使啟用了冪等producer也能維持max.in.flight.requests.per.connection > 1,具體的算法我還沒有看,不過總之是個好消息。至於表現如何就讓我們拭目以待吧~~

Kafka設計解析(二十一)關於Kafka冪等producer的討論