1. 程式人生 > >kafka一直rebalance故障,重複消費

kafka一直rebalance故障,重複消費

今天我司線上kafka訊息代理出現錯誤日誌,異常rebalance,而且平均間隔2到3分鐘就會rebalance一次,分析日誌發現比較嚴重。錯誤日誌如下

08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161] 

這個錯誤的意思是,消費者在處理完一批poll的訊息後,在同步提交偏移量給broker時報的錯。初步分析日誌是由於當前消費者執行緒消費的分割槽已經被broker給回收了,因為kafka認為這個消費者死了,那麼為什麼呢?

分析問題

這裡就涉及到問題是消費者在建立時會有一個屬性max.poll.interval.ms
該屬性意思為kafka消費者在每一輪poll()呼叫之間的最大延遲,消費者在獲取更多記錄之前可以空閒的時間量的上限。如果此超時時間期滿之前poll()沒有被再次呼叫,則消費者被視為失敗,並且分組將重新平衡,以便將分割槽重新分配給別的成員。

 
image.png

 

如上圖,在while迴圈裡,我們會迴圈呼叫poll拉取broker中的最新訊息。每次拉取後,會有一段處理時長,處理完成後,會進行下一輪poll。引入該配置的用途是,限制兩次poll之間的間隔,訊息處理邏輯太重,每一條訊息處理時間較長,但是在這次poll()到下一輪poll()時間不能超過該配置間隔,協調器會明確地讓使用者離開組,並觸發新一輪的再平衡。
max.poll.interval.ms預設間隔時間為300s

分析日誌

從日誌中我們能看到poll量有時能夠達到250多條


 
image.png

一次性拉取250多條訊息進行消費,而由於每一條訊息都有一定的處理邏輯,根據以往的日誌分析,每條訊息平均在500ms內就能處理完成。然而,我們今天查到有兩條訊息處理時間超過了1分鐘。

訊息處理日誌1

08-09 08:50:05 430 pool-7-thread-3 INFO [] - [RestKafkaConsumer] receive message (收到訊息,準備過濾,然後處理), topic: member_1.0.0_event ,partition: 0 ,offset: 1504617 08-09 08:50:05 431 pool-7-thread-3 INFO [] - [RestKafkaConsumer]:解析訊息成功,準備請求呼叫! 
08-09 08:51:05 801 pool-7-thread-3 INFO [] - [HttpClient]:response code: {"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送沒有可用的營銷活動--老pos機"},"fullAmountSendRes":{"status":400,"info":"滿額送沒有可用的營銷活動--老pos機"}},"info":"發券流程執 行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event內 容:{"id":36305914,"score":16,"orderPrice":15.9,"payTime":1533775401000,"thirdTransId":"4200000160201808 

訊息處理日誌2

08-09 08:51:32 450 pool-7-thread-3 INFO [] - [RestKafkaConsumer] receive message (收到訊息,準備過濾,然後處理), topic: member_1.0.0_event ,partition: 0 ,offset: 1504674 08-09 08:51:32 450 pool-7-thread-3 INFO [] - [RestKafkaConsumer]:解析訊息成功,準備請求呼叫! 
08-09 08:52:32 843 pool-7-thread-3 INFO [] - [HttpClient]:response code: {"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送沒有可用的營銷活動--老pos機"},"fullAmountSendRes":{"status":400,"info":"滿額送沒有可用的營銷活動--老pos機"}},"info":"發券流程執 行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event內 容:{"id":36306061,"score":3,"orderPrice":3.0,"payTime":1533775482000,"thirdTransId":"420000016320180809 

我們看到訊息消費時間都超過了1分鐘。

分析原因

如下是我們消費者處理邏輯(省略部分程式碼)

 while (isRunning) {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) { for (ConsumerRecord<KEY, VALUE> record : records) { dealMessage(bizConsumer, record.value()); try { //records記錄全部完成後,才提交 consumer.commitSync(); } catch (CommitFailedException e) { logger.error("commit failed,will break this for loop", e); break; } } } 

poll()方法該方法輪詢返回訊息集,呼叫一次可以獲取一批訊息。

kafkaConsumer呼叫一次輪詢方法只是拉取一次訊息。客戶端為了不斷拉取訊息,會用一個外部迴圈不斷呼叫消費者的輪詢方法。每次輪詢到訊息,在處理完這一批訊息後,才會繼續下一次輪詢。但如果一次輪詢返回的結構沒辦法及時處理完成,會有什麼後果呢?服務端約定了和客戶端max.poll.interval.ms,兩次poll最大間隔。如果客戶端處理一批訊息花費的時間超過了這個限制時間,服務端可能就會把消費者客戶端移除掉,並觸發rebalance

拉取偏移量與提交偏移量

kafka的偏移量(offset)是由消費者進行管理的,偏移量有兩種,拉取偏移量(position)與提交偏移量(committed)。拉取偏移量代表當前消費者分割槽消費進度。每次訊息消費後,需要提交偏移量。在提交偏移量時,kafka會使用拉取偏移量的值作為分割槽的提交偏移量傳送給協調者。
如果沒有提交偏移量,下一次消費者重新與broker連線後,會從當前消費者group已提交到broker的偏移量處開始消費。
所以,問題就在這裡,當我們處理訊息時間太長時,已經被broker剔除,提交偏移量又會報錯。所以拉取偏移量沒有提交到broker,分割槽又rebalance。下一次重新分配分割槽時,消費者會從最新的已提交偏移量處開始消費。這裡就出現了重複消費的問題。

解決方案

1.增加max.poll.interval.ms處理時長

kafka消費者預設此間隔時長為300s

max.poll.interval.ms=300

2.設定分割槽拉取閾值

kafkaConsumer呼叫一次輪詢方法只是拉取一次訊息。客戶端為了不斷拉取訊息,會用一個外部迴圈不斷呼叫輪詢方法poll()。每次輪詢後,在處理完這一批訊息後,才會繼續下一次的輪詢。

max.poll.records = 50

3.poll到的訊息,處理完一條就提交一條,當出現提交失敗時,馬上跳出迴圈,這時候kafka就會進行rebalance,下一次會繼續從當前offset進行消費。

 while (isRunning) {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) { for (ConsumerRecord<KEY, VALUE> record : records) { dealMessage(bizConsumer, record.value()); try { //records記錄全部完成後,才提交 consumer.commitSync(); } catch (CommitFailedException e) { logger.error("commit failed,will break this for loop", e); break; } } } 

附錄 查詢日誌 某個topic的 partition 的rebalance過程

member_1分割槽

時間 revoked position revoked committed 時間 assigned
08:53:21 1508667 1508509 08:57:17 1508509
09:16:31 1509187 1508509 09:21:02 1508509
09:23:18 1509323 1508509 09:26:02 1508509
09:35:16 1508509 1508509 09:36:03 1508509
09:36:21 1508509 1508509 09:41:03 1508509
09:42:15 1509323 1508509 09:46:03 1508509
09:47:19 1508509 1508509 09:51:03 1508509
09:55:04 1509323 1509323 09:56:03 1509323
多餘消費 被回滾 重複消費 10:01:03 1509323
10:02:20 1510205 1509323 10:06:03 1509323
10:07:29 1509323 1509323 10:08:35 1509323
10:24:43 1509693 1509693 10:25:18 1509693
10:28:38 1510604 1510604 10:35:18 1510604
10:36:37 1511556 1510604 10:40:18 1510604
10:54:26 1511592 1511592 10:54:32 1511592
- - - 10:59:32 1511979
11:01:11 1512178 1512178 11:03:40 1512178
11:04:35 1512245 1512245 11:08:49 1512245
11:12:47 1512407 1512407 11:12:49 1512407


作者:楓葉lhz
連結:https://www.jianshu.com/p/271f88f06eb3
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。