1. 程式人生 > >Apache Kafka入門教程輕鬆學-第四章 Kafka核心元件和流程-設計-原理(二)協調器(消費者和組協調器)

Apache Kafka入門教程輕鬆學-第四章 Kafka核心元件和流程-設計-原理(二)協調器(消費者和組協調器)

本入門教程,涵蓋Kafka核心內容,通過例項和大量圖表,幫助學習者理解,任何問題歡迎留言。

目錄:

上一節介紹了kafka工作的核心元件--控制器。本節將介紹消費者密切相關的元件--協調器。它負責消費者的出入組工作。大家可以回想一下kafka核心概念中關於吃蘋果的場景,如果我邀請了100個人過來吃蘋果,如果沒有人告訴每個吃蘋果的人哪個是他的盤子,那豈不是要亂了套?協調器做的就是這個工作。當然還有更多。

2 協調器

顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啟動後,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴於協調器。

主要的協調器有如下兩個:

1、消費者協調器(ConsumerCoordinator)

2、組協調器(GroupCoordinator)

此外還有任務管理協調器(WorkCoordinator),用作kafka connect的works管理,本教程不做講解。

kafka引入協調器有其歷史過程,原來consumer資訊依賴於zookeeper儲存,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通訊,容易造成羊群效應和腦裂問題。

為了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啟動的時候,都會建立GroupCoordinator例項,管理部分消費組(叢集負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer例項化時,同時例項化一個ConsumerCoordinator物件,負責同一個消費組下各個消費者和服務端組協調器之前的通訊。如下圖:

2.1 消費者協調器

消費者協調器,可以看作是消費者做操作的代理類(其實並不是),消費者很多操作通過消費者協調器進行處理。

消費者協調器主要負責如下工作:

1、更新消費者快取的MetaData

2、向組協調器申請加入組

3、消費者加入組後的相應處理

4、請求離開消費組

5、向組協調器提交偏移量

6、通過心跳,保持組協調器的連線感知。

7、被組協調器選為leader的消費者的協調器,負責消費者分割槽分配。分配結果傳送給組協調器。

8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。

消費者協調器主要依賴的元件和說明見下圖:

可以看到這些元件和消費者協調器擔負的工作是可以對照上的。

2.2 組協調器

組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:

  1. 在與之連線的消費者中選舉出消費者leader
  2. 下發leader消費者返回的消費者分割槽分配結果給所有的消費者
  3. 管理消費者的消費偏移量提交,儲存在kafka的內部主題中
  4. 和消費者心跳保持,知道哪些消費者已經死掉,組中存活的消費者是哪些。

組協調器在broker啟動的時候例項化,每個組協調器負責一部分消費組的管理。它主要依賴的元件見下圖:

這些元件也是和組協調器的功能能夠對應上的。具體內容不在詳述。

2.3 消費者入組過程

下圖展示了消費者啟動選取leader、入組的過程。

消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分割槽分配的工作,這樣kafka叢集的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分割槽的對應關係持久化在kafka內部主題。

2.4 消費偏移量管理

消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪裡開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分割槽變化後,消費者不再對應原來的分割槽,而每個消費者的offset也沒有同步到伺服器,這樣就無法接著前任的工作繼續進行了。

因此只有把消費偏移量定期傳送到伺服器,由GroupCoordinator集中式管理,分割槽重分配後,各個消費者從GroupCoordinator讀取自己對應分割槽的offset,在新的分割槽上繼續前任的工作。

下圖展示了不提交offset到服務端的問題:

開始時,consumer 0消費partition 0 和1,後來由於新的consumer 2入組,分割槽重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由於consumer之間是不能通訊的,所有consumer2並不知道從哪裡開始自己的消費。

因此consumer需要定期提交自己消費的offset到服務端,這樣在重分割槽操作後,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。

由於kafka有高可用和橫向擴充套件的特性,當有新的分割槽出現或者新的消費入組後,需要重新分配消費者對應的分割槽,所以如果偏移量提交的有問題,會重複消費或者丟訊息。偏移量提交的時機和方式要格外注意!!

下面兩種情況分別會造成重複消費和丟訊息:

  1. 如果提交的偏移量小於消費者最後一次消費的偏移量,那麼再均衡後,兩個offset之間的訊息就會被重複消費
  2. 如果提交的偏移量大於消費者最後一次消費的偏移量,那麼再均衡後,兩個offset之間的訊息就會丟失

以上兩種情況是如何產生的呢?我們繼續往下看。

2.4.1 偏移量有兩種提交方式

1、自動提交偏移量

設定 enable.auto.commit為true,設定好週期,預設5s。消費者每次呼叫輪詢訊息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。

這樣做很方便,但是會帶來重複消費的問題。假如最近一次偏移量提交3s後,觸發了再均衡,伺服器端儲存的還是上次提交的偏移量,那麼再均衡結束後,新的消費者會從最後一次提交的偏移量開始拉取訊息,此3s內消費的訊息會被重複消費。

2、手動提交偏移量

設定 enable.auto.commit為false。程式中手動呼叫commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。

我們來看下面兩個提交時機:

  • 如果poll完馬上呼叫commitSync(),那麼一旦處理到中間某條訊息的時候異常,由於偏移量已經提交,那麼出問題的訊息位置到提交偏移量之間的訊息就會丟失。

  • 如果處理完所有訊息後才呼叫commitSync()。有可能在處理到一半的時候發生再均衡,此時偏移量還未提交,那麼再均衡後,會從上次提交的位置開始消費,造成重複消費。

比較起來,重複消費要比丟訊息好一些,所以我們程式應採用第二種方式,同時消費邏輯中,要能夠檢查重複消費。

commitSync()是同步提交偏移量,主程式會一直阻塞,偏移量提交成功後才往下執行。這樣會限制程式的吞吐量。如果降低提交頻次,又很容易發生重複消費。

這裡我們可以使用commitAsync()非同步提交偏移量。只管提交,而不會等待broker返回提交結果

commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那麼更小的偏移量會覆蓋大的偏移量。那麼如果此時發生再均衡,新的消費者將會重複消費訊息。

commitAsync也支援回撥,由於上述原因,回撥中最好不要因為失敗而重試提交。而是應該記錄錯誤,以便後續分析和補償。

2.4.2  偏移量提交的最佳實踐

關於偏移量的提交方式和時機,上文已經有了大量的講解。但看完後好像還不知道應該怎麼提交偏移量才是最合適的。是不是覺得無論怎麼提交,都無法避免重複消費?沒錯,事實就是這樣,我們只能採用合理的方式,最大可能的去降低發生此類問題的概率。此外做好補償處理。

一般來說,偶爾的提交失敗,不去重試,是沒有問題的。因為一般是因為臨時的問題而失敗,後續的提交總會成功。如果我們在關閉消費者或者再均衡前,確保所有的消費者都能成功提交一次偏移量,也可以保證再均衡後,消費者能接著消費資料。

因此我們採用同步和非同步混合的方式提交偏移量。

  1. 正常消費訊息時,消費結束提交偏移量,採用非同步方式
  2. 如果程式報錯,finally中,提交偏移量,採用同步方式,確保提交成功
  3. 再均衡前的回撥方法中,提交偏移量,採用同步方式,確保提交成功

這樣既保證了吞吐量,也保證了提交偏移量的安全性。另外由於再均衡前提交偏移量,降低了重複消費可能。

kafka還提供了提交特定偏移量的方法。我們可以指定分割槽和offset進行提交。分割槽和offset的值可以從訊息物件中取得。

另外,如果擔心一次取回資料量太大,可能處理到一半的時候出現再均衡,導致偏移量沒有提交,重複消費。那麼可以每n條提交一次。

而當n=1時,也就是處理一條資料就提交一次,會把重複消費的可能降到最低。同時由於增加了和服務端的通訊,效率大大降低。

其實即使這樣,也是可能重複消費的,試想如下場景:

  1. 消費者拉取到資料後,開始邏輯處理
  2. 處理第一條offset=2,成功了,提交offset=3
  3. 開始處理offset=3的訊息,處理完成後,但提交offset=4前,此消費者突然意外掛掉了,所以也沒能進入異常處理。偏移量沒能成功提交。
  4. 消費者進行了再均衡,新的消費者接手此分割槽進行消費,取到的offset還是上一次提交的3,那麼將會重複消費offset=3的訊息。

所以我們應平衡重複消費發生的概率和程式的效率,來設定提交的時機。同時程式邏輯一定做好重複消費的檢查工作!

2.5 回顧

本節從協調器講起,首先介紹了消費者協調器和組協調器,以及他們是如何配合工作的。從消費偏移量的管理展開,詳細介紹了偏移量的提交,及提交的最佳實踐。本節沒有涉及程式碼部分,所有知識點相關的程式碼將在最後一章中統一給出。現在的要求只是理解知識點。