1. 程式人生 > >kafka實現無訊息丟失與精確一次語義(exactly once)處理

kafka實現無訊息丟失與精確一次語義(exactly once)處理

在很多的流處理框架的介紹中,都會說kafka是一個可靠的資料來源,並且推薦使用Kafka當作資料來源來進行使用。這是因為與其他訊息引擎系統相比,kafka提供了可靠的資料儲存及備份機制。並且通過消費者位移這一概念,可以讓消費者在因某些原因宕機而重啟後,可以輕易得回到宕機前的位置。 但其實kafka的可靠性也只能說是相對的,在整條資料鏈條中,總有可以讓資料出現丟失的情況,今天就來討論如何避免kafka資料丟失,以及實現精確一致處理的語義。 # kafka無訊息丟失處理 在討論如何實現kafka無訊息丟失的時候,首先要先清楚大部分情況下訊息丟失是在什麼情況下發生的。為什麼是大部分,因為總有一些非常特殊的情況會被人忽略,而我們只需要關注普遍的情況就足夠了。接下來我們來討論如何較為普遍的資料丟失情況。 ### 1.1 生產者丟失 前面介紹Kafka分割槽和副本的時候,有提到過一個producer客戶端有一個acks的配置,這個配置為0的時候,producer是傳送之後不管的,這個時候就很有可能因為網路等原因造成資料丟失,所以應該儘量避免。但是將ack設定為1就沒問題了嗎,那也不一定,因為有可能在leader副本接收到資料,但還沒同步給其他副本的時候就掛掉了,這時候資料也是丟失了。並且這種時候是客戶端以為訊息傳送成功,但kafka丟失了資料。 **要達到最嚴格的無訊息丟失配置,應該是要將acks的引數設定為-1(也就是all),並且將min.insync.replicas配置項調高到大於1**,這部分內容在上一篇副本機制有介紹[詳細解析kafka之kafka分割槽和副本](https://www.cnblogs.com/listenfwind/p/12465409.html)。 **同時還需要使用帶有回撥的producer api,來發送資料**。注意這裡討論的都是非同步傳送訊息,同步傳送不在討論範圍。 ``` public class send{ ...... public static void main(){ ... /* * 第一個引數是 ProducerRecord 型別的物件,封裝了目標 Topic,訊息的 kv * 第二個引數是一個 CallBack 物件,當生產者接收到 Kafka 發來的 ACK 確認訊息的時候, * 會呼叫此 CallBack 物件的 onCompletion() 方法,實現回撥功能 */ producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); ... } ...... } class DemoCallBack implements Callback { /* 開始傳送訊息的時間戳 */ private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 生產者成功傳送訊息,收到 Kafka 服務端發來的 ACK 確認訊息後,會呼叫此回撥函式 * @param metadata 生產者傳送的訊息的元資料,如果傳送過程中出現異常,此引數為 null * @param exception 傳送過程中出現的異常,如果傳送成功為 null */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n", key, message, metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } } } ``` 更詳細的程式碼可以參考這裡:[Kafka生產者分析——KafkaProducer](https://binglau7.github.io/2017/12/18/Kafka%E7%94%9F%E4%BA%A7%E8%80%85%E5%88%86%E6%9E%90%E2%80%94%E2%80%94KafkaProducer/)。 我們之前提到過,producer傳送到kafka broker的時候,是有多種可能會失敗的,而回調函式能準確告訴你是否確認傳送成功,當然這依託於acks和min.insync.replicas的配置。而當資料傳送丟失的時候,就可以進行手動重發或其他操作,從而確保生產者傳送成功。 ### 1.2 kafka內部丟失 有些時候,kafka內部因為一些不大好的配置,可能會出現一些極為隱蔽的資料丟失情況,那麼我們分別討論下大致都有哪幾種情況。 首先是replication.factor配置引數,這個配置決定了副本的數量,預設是1。注意這個引數不能超過broker的數量。說這個引數其實是因為如果使用預設的1,或者不在建立topic的時候指定副本數量(也就是副本數為1),那麼當一臺機器出現磁碟損壞等情況,那麼資料也就從kafka裡面丟失了。**所以replication.factor這個引數最好是配置大於1,比如說3**。 接下來要說的還是和副本相關的,也是上一篇副本中提到的unclean.leader.election.enable 引數,這個引數是在主副本掛掉,然後在ISR集合中沒有副本可以成為leader的時候,要不要讓進度比較慢的副本成為leader的。不用多說,讓進度比較慢的副本成為leader,肯定是要丟資料的。雖然可能會提高一些可用性,但如果你的業務場景丟失資料更加不能忍受,**那還是將unclean.leader.election.enable設定為false吧**。 ### 1.3 消費者丟失 消費者丟失的情況,其實跟消費者位移處理不當有關。消費者位移提交有一個引數,enable.auto.commit,預設是true,決定是否要讓消費者自動提交位移。如果開啟,那麼consumer每次都是先提交位移,再進行消費,比如先跟broker說這5個數據我消費好了,然後才開始慢慢消費這5個數據。 這樣處理的話,好處是簡單,壞處就是漏消費資料,比如你說要消費5個數據,消費了2個自己就掛了。那下次該consumer重啟後,在broker的記錄中這個consumer是已經消費了5個的。 **所以最好的做法就是將enable.auto.commit設定為false,改為手動提交位移,在每次消費完之後再手動提交位移資訊**。當然這樣又有可能會重複消費資料,畢竟exactly once處理一直是一個問題呀(/攤手)。遺憾的是kafka目前沒有保證consumer冪等消費的措施,如果確實需要保證consumer的冪等,可以對每條訊息維持一個全域性的id,每次消費進行去重,當然耗費這麼多的資源來實現exactly once的消費到底值不值,那就得看具體業務了。 ### 1.4 無訊息丟失小結 那麼到這裡先來總結下無訊息丟失的主要配置吧: - producer的acks設定位-1,同時min.insync.replicas設定大於1。並且使用帶有回撥的producer api發生訊息。 - 預設副本數replication.factor設定為大於1,或者建立topic的時候指定大於1的副本數。 - unclean.leader.election.enable 設定為false,防止定期副本leader重選舉 - 消費者端,自動提交位移enable.auto.commit設定為false。在消費完後手動提交位移。 那麼接下來就來說說kafka實現精確一次(exactly once)處理的方法吧。 # 實現精確一次(exactly once)處理 在分散式環境下,要實現訊息一致與精確一次(exactly once)語義處理是很難的。精確一次處理意味著一個訊息只處理一次,造成一次的效果,不能多也不能少。 那麼kafka如何能夠實現這樣的效果呢?在介紹之前,我們先來介紹其他兩個語義,至多一次(at most once)和至少一次(at least once)。 ### 最多一次和至少一次 最多一次就是保證一條訊息只發送一次,這個其實最簡單,非同步傳送一次然後不管就可以,缺點是容易丟資料,所以一般不採用。 至少一次語義是kafka預設提供的語義,它保證每條訊息都能至少接收並處理一次,缺點是可能有重複資料。 前面有介紹過acks機制,當設定producer客戶端的acks是1的時候,broker接收到訊息就會跟producer確認。但producer傳送一條訊息後,可能因為網路原因訊息超時未達,這時候producer客戶端會選擇重發,broker迴應接收到訊息,但很可能最開始傳送的訊息延遲到達,就會造成訊息重複接收。 那麼針對這些情況,要如何實現精確一次處理的語義呢? ### 冪等的producer 要介紹冪等的producer之前,得先了解一下冪等這個詞是什麼意思。冪等這個詞最早起源於函數語言程式設計,意思是一個函式無論執行多少次都會返回一樣的結果。比如說讓一個數加1就不是冪等的,而讓一個數取整就是冪等的。因為這個特性所以冪等的函式適用於併發的場景下。 但冪等在分散式系統中含義又做了進一步的延申,比如在kafka中,冪等性意味著一個訊息無論重複多少次,都會被當作一個訊息來持久化處理。 kafka的producer預設是支援最少一次語義,也就是說不是冪等的,這樣在一些比如支付等要求精確資料的場景會出現問題,在0.11.0後,kafka提供了讓producer支援冪等的配置操作。即: > props.put("enable.idempotence", ture) 在建立producer客戶端的時候,新增這一行配置,producer就變成冪等的了。注意開啟冪等性的時候,acks就自動是“all”了,如果這時候手動將ackss設定為0,那麼會報錯。 而底層實現其實也很簡單,就是對每條訊息生成一個id值,broker會根據這個id值進行去重,從而實現冪等,這樣一來就能夠實現精確一次的語義了。 但是!冪等的producery也並非萬能。有兩個主要是缺陷: - 冪等性的producer僅做到單分割槽上的冪等性,即單分割槽訊息不重複,多分割槽無法保證冪等性。 - 只能保持單會話的冪等性,無法實現跨會話的冪等性,也就是說如果producer掛掉再重啟,無法保證兩個會話間的冪等(新會話可能會重發)。因為broker端無法獲取之前的狀態資訊,所以無法實現跨會話的冪等。 ### 事務的producer 當遇到上述冪等性的缺陷無法解決的時候,可以考慮使用事務了。事務可以支援多分割槽的資料完整性,原子性。並且支援跨會話的exactly once處理語義,也就是說如果producer宕機重啟,依舊能保證資料只處理一次。 開啟事務也很簡單,首先需要開啟冪等性,即設定enable.idempotence為true。然後對producer傳送程式碼做一些小小的修改。 ``` //初始化事務 producer.initTransactions(); try { //開啟一個事務 producer.beginTransaction(); producer.send(record1); producer.send(record2); //提交 producer.commitTransaction(); } catch (KafkaException e) { //出現異常的時候,終止事務 producer.abortTransaction(); } ``` 但無論開啟冪等還是事務的特性,都會對效能有一定影響,這是必然的。所以kafka預設也並沒有開啟這兩個特性,而是交由開發者根據自身業務特點進行處理。 以上~ --- 推薦閱讀: [分散式系統一致性問題與Raft演算法(上)](https://www.cnblogs.com/listenfwind/p/12378701.html) [Scala函數語言程式設計(五) 函式式的錯誤處理](https://www.cnblogs.com/listenfwind/p/12378701.html) [大資料儲存的進化史 --從 RAID 到 Hadoop Hdfs](https://www.cnblogs.com/listenfwind/p/10133772