1. 程式人生 > >kafka重複消費問題

kafka重複消費問題

問題描述
採用kafka讀取訊息進行處理時,consumer會重複讀取afka佇列中的資料。

問題原因
kafka的consumer消費資料時首先會從broker裡讀取一批訊息資料進行處理,處理完成後再提交offset。而我們專案中的consumer消費能力比較低,導致取出的一批資料在session.timeout.ms時間內沒有處理完成,自動提交offset失敗,然後kafka會重新分配partition給消費者,消費者又重新消費之前的一批資料,又出現了消費超時,所以會造成死迴圈,一直消費相同的資料。

解決方案
專案中使用的是spring-kafka,所以把kafka消費者的配置enable.auto.commit設為false,禁止kafka自動提交offset,從而使用spring-kafka提供的offset提交策略。spring-kafka中的offset提交策略可以保證一批訊息資料沒有完成消費的情況下,也能提交offset,從而避免了提交失敗而導致永遠重複消費的問題。

>

首先來看看spring-kafka的消費執行緒邏輯

if (isRunning() && this.definedPartitions != null) { 
    initPartitionsIfNeeded();      
// we start the invoker here as there will be no rebalance calls to       
// trigger it, but only if the container is not set to autocommit       
// otherwise we will process records on a separate thread      
   if (!this.autoCommit) {        
          startInvoker();     
   }
}

上面可以看到,如果auto.commit關掉的話,spring-kafka會啟動一個invoker,這個invoker的目的就是啟動一個執行緒去消費資料,他消費的資料不是直接從kafka裡面直接取的,那麼他消費的資料從哪裡來呢?他是從一個spring-kafka自己建立的阻塞佇列裡面取的。

然後會進入一個迴圈,從原始碼中可以看到如果auto.commit被關掉的話, 他會先把之前處理過的資料先進行提交offset,然後再去從kafka裡面取資料。

然後把取到的資料丟給上面提到的阻塞列隊,由上面建立的執行緒去消費,並且如果阻塞佇列滿了導致取到的資料塞不進去的話,spring-kafka會呼叫kafka的pause方法,則consumer會停止從kafka裡面繼續再拿資料。

接著spring-kafka還會處理一些異常的情況,比如失敗之後是不是需要commit offset這樣的邏輯。