1. 程式人生 > >詳細解析kafka之 kafka消費者組與重平衡機制

詳細解析kafka之 kafka消費者組與重平衡機制

消費組組(Consumer group)可以說是kafka很有亮點的一個設計。傳統的訊息引擎處理模型主要有兩種,佇列模型,和釋出-訂閱模型。

佇列模型:早期訊息處理引擎就是按照佇列模型設計的,所謂佇列模型,跟佇列資料結構類似,生產者產生訊息,就是入隊,消費者接收訊息就是出隊,並刪除佇列中資料,訊息只能被消費一次。但這種模型有一個問題,那就是隻能由一個消費者消費,無法直接讓多個消費者消費資料。基於這個缺陷,後面又演化出釋出-訂閱模型。

釋出-訂閱模型:釋出訂閱模型中,多了一個主題。消費者會預先訂閱主題,生產者寫入訊息到主題中,只有訂閱了該主題的消費者才能獲取到訊息。這樣一來就可以讓多個消費者消費資料。

以往的訊息處理引擎大多隻支援其中一種模型,但藉助kafka的消費者組機制,可以同時實現這兩種模型。同時還能夠對消費組進行動態擴容,讓消費變得易於伸縮。

這篇我們先介紹下消費者組,然後主要討論kafka著名的重平衡機制。

kafka消費者組

所謂消費者組,那自然是由消費者組成的,組內可以有一個或多個消費者例項,而這些消費者例項共享一個id,稱為group id。對了,預設建立消費者的group id是在KAFKA_HOME/conf/consumer.properties檔案中定義的,開啟就能看到。預設的group id值是test-consumer-group。

消費者組內的所有成員一起訂閱某個主題的所有分割槽,注意一個消費者組中,每一個分割槽只能由組內的一消費者訂閱。

看看下面這張圖,這是kakfa官網上給出的說明圖。

這張圖應該很好的說明了消費者組,我們從上到下解釋一下,kafka cluster中有兩臺broker伺服器,每一臺都有兩個分割槽,這四個分割槽都是同一個topic下的。下左的消費者組A,組內有兩個消費者,每個消費者負責兩個分割槽的消費,而右邊的消費者組B有四個消費者,每個負責消費一個分割槽。

當消費者組中只有一個消費者的時候,就是訊息佇列模型,不然就是釋出-訂閱模型,並且易於伸縮。

消費者組內消費者數量

上面那張圖,仔細推敲一下就會發現,圖中其實已經有一些既定的事實,比如消費者組內消費者小於或等於分割槽數,以及topic分割槽數剛好是消費者組內成員數的倍數。

那麼如果消費者組內成員數超過分割槽數會怎樣呢?比如有4個分割槽,但消費者組內有6個消費者,這時候有2個消費者不會分配分割槽,它會一直空閒。

而如果消費者不是分割槽的倍數,比如topic內有4個分割槽,而消費者組內有三個消費者,那怎麼辦呢?這時候只會有兩個消費者分別被分配兩個分割槽,第三個消費者同樣空閒。

所以,消費者組內的消費者數量最好是與分割槽數持平,再不濟,最好也是要是分割槽數的數量成比例。

檢視叢集中的消費者組

這裡順便說下如何檢視消費者組及組內消費情況,可以使用ConsumerGroupCommand命令工具,來檢視具體的kafka消費者組。注意,這裡都是以最新版的kafka版本,也就是2.+版本。

可以使用如下命令列出當前叢集中的kafka組資訊。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
test-consumer-group

具體到某個組的消費者情況,可以使用下面這條命令工具:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
 
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

重平衡(Rebalance)

說完消費者組,再來說說與消費者組息息相關的重平衡機制。重平衡可以說是kafka為人詬病最多的一個點了。

重平衡其實就是一個協議,它規定了如何讓消費者組下的所有消費者來分配topic中的每一個分割槽。比如一個topic有100個分割槽,一個消費者組內有20個消費者,在協調者的控制下讓組內每一個消費者分配到5個分割槽,這個分配的過程就是重平衡。

重平衡的觸發條件主要有三個:

  • 消費者組內成員發生變更,這個變更包括了增加和減少消費者。注意這裡的減少有很大的可能是被動的,就是某個消費者崩潰退出了
  • 主題的分割槽數發生變更,kafka目前只支援增加分割槽,當增加的時候就會觸發重平衡
  • 訂閱的主題發生變化,當消費者組使用正則表示式訂閱主題,而恰好又新建了對應的主題,就會觸發重平衡

為什麼說重平衡為人詬病呢?因為重平衡過程中,消費者無法從kafka消費訊息,這對kafka的TPS影響極大,而如果kafka集內節點較多,比如數百個,那重平衡可能會耗時極多。數分鐘到數小時都有可能,而這段時間kafka基本處於不可用狀態。所以在實際環境中,應該儘量避免重平衡發生。

瞭解了什麼是重平衡,重平衡的缺點和觸發條件後,我們先來看看重平衡的三種不同策略,然後說說應該如何避免重平衡發生。

三種重平衡策略

kafka提供了三種重平衡分配策略,這裡順便介紹一下:

Range

具體實現位於,package org.apache.kafka.clients.consumer.RangeAssignor。

這種分配是基於每個主題的分割槽分配,如果主題的分割槽分割槽不能平均分配給組內每個消費者,那麼對該主題,某些消費者會被分配到額外的分割槽。我們來看看具體的例子。

舉例:目前有兩個消費者C0和C1,兩個主題t0和t1,每個主題三個分割槽,分別是t0p0,t0p1,t0p2,和t1p0,t1p1,t1p2。

那麼分配情況會是:

  • C0:t0p0, t0p1, t1p0, t1p1
  • C1:t0p2, t1p2

我來大概解釋一下,range這種模式,消費者被分配的單位是基於主題的,拿上面的例子來說,是主題t0的三個分割槽分配給2個消費者,t1三個分割槽分配給消費者。於是便會出現消費者c0分配到主題c0兩個主題,c1兩個主題的情況,而非每個消費者分配兩個主題各三個分割槽。

RoundRobin

具體實現位於,package org.apache.kafka.clients.consumer.RoundRobinAssignor。

RoundRobin是基於全部主題的分割槽來進行分配的,同時這種分配也是kafka預設的rebalance分割槽策略。還是用剛剛的例子來看,

舉例:兩個消費者C0和C1,兩個主題t0和t1,每個主題三個分割槽,分別是t0p0,t0p1,t0p2,和t1p0,t1p1,t1p2。

由於是基於全部主題的分割槽,那麼分配情況會是:

  • C0:t0p0, t0p1, t1p1
  • C1:t1p0, t0p2, t1p2

因為是基於全部主題的分割槽來平均分配給消費者,所以這種分配策略能更加均衡得分配分割槽給每一個消費者。

上面說的都是同一消費者組內消費組都訂閱相同主題的情況。更復雜的情況是,同一組內的消費者訂閱不同的主題,那麼任然可能會導致分割槽不均衡的情況。

還是舉例說明,有三個消費者C0,C1,C2 。三個主題t0,t1,t2,分別有1,2,3個分割槽 t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。

其中,C0訂閱t0,C1訂閱t0,t1。C2訂閱t0,t1,t2。最終訂閱情況如下:

  • C0:t0p0
  • C1:t1p0
  • C2:t1p1,t2p0,t2p1,t2p2

這個結果乍一看有點迷,其實可以這樣理解,按照序號順序進行迴圈分配,t0只有一個分割槽,先碰到C0就分配給它了。t1有兩個分割槽,被C1和C2訂閱,那麼會迴圈將兩個分割槽分配出去,最後到t2,有三個分割槽,卻只有C2訂閱,那麼就將三個分割槽分配給C2。

Sticky

Sticky分配策略是最新的也是最複雜的策略,其具體實現位於package org.apache.kafka.clients.consumer.StickyAssignor。

這種分配策略是在0.11.0才被提出來的,主要是為了一定程度解決上面提到的重平衡非要重新分配全部分割槽的問題。稱為粘性分配策略。

聽名字就知道,主要是為了讓目前的分配儘可能保持不變,只挪動儘可能少的分割槽來實現重平衡。

還是舉例說明,有三個消費者C0,C1,C2 。三個主題t0,t1,t2,t3。每個主題各有兩個分割槽, t0p0,t0p1,t1p0,t1p1,t2p0,t2p1,t3p0,t3p1。

現在訂閱情況如下:

  • C0:t0p0,t1p1,t3p0
  • C1:t0p1,t2p0,t3p1
  • C2:t1p0,t2p1

假設現在C1掛掉了,如果是RoundRobin分配策略,那麼會變成下面這樣:

  • C0:t0p0,t1p0,t2p0,t3p0
  • C2:t0p1,t1p1,t2p1,t3p1

就是說它會全部重新打亂,再分配,而如何使用Sticky分配策略,會變成這樣:

  • C0:t0p0,t1p1,t3p0,t2p0
  • C2:t1p0,t2p1,t0p1,t3p1

也就是說,儘可能保留了原來的分割槽情況,不去改變它,在這個基礎上進行均衡分配,不過這個策略目前似乎還有些bug,所以實際使用也不多。

避免重平衡

要說完全避免重平衡,那是不可能滴,因為你無法完全保證消費者不會故障。而消費者故障其實也是最常見的引發重平衡的地方,所以這裡主要介紹如何盡力避免消費者故障。

而其他幾種觸發重平衡的方式,增加分割槽,或是增加訂閱的主題,抑或是增加消費者,更多的是主動控制,這裡也不多討論。

首先要知道,如果消費者真正掛掉了,那我們是沒有什麼辦法的,但實際中,會有一些情況,會讓kafka錯誤地認為一個正常的消費者已經掛掉了,我們要的就是避免這樣的情況出現。

當然要避免,那首先要知道哪些情況會出現錯誤判斷掛掉的情況。在分散式系統中,通常是通過心跳來維持分散式系統的,kafka也不例外。對這部分內容有興趣可以看看我之前的這篇分散式系統一致性問題與Raft演算法(上)。這裡要說的是,在分散式系統中,由於網路問題你不清楚沒接收到心跳,是因為對方真正掛了還是隻是因為負載過重沒來得及發生心跳或是網路堵塞。所以一般會約定一個時間,超時即判定對方掛了。而在kafka消費者場景中,session.timout.ms引數就是規定這個超時時間是多少。

還有一個引數,heartbeat.interval.ms,這個引數控制傳送心跳的頻率,頻率越高越不容易被誤判,但也會消耗更多資源。

此外,還有最後一個引數,max.poll.interval.ms,我們都知道消費者poll資料後,需要一些處理,再進行拉取。如果兩次拉取時間間隔超過這個引數設定的值,那麼消費者就會被踢出消費者組。也就是說,拉取,然後處理,這個處理的時間不能超過max.poll.interval.ms這個引數的值。這個引數的預設值是5分鐘,而如果消費者接收到資料後會執行耗時的操作,則應該將其設定得大一些。

小結一下,其實主要就是三個引數,session.timout.ms控制心跳超時時間,heartbeat.interval.ms控制心跳傳送頻率,以及max.poll.interval.ms控制poll的間隔。這裡給出一個相對較為合理的配置,如下:

  • session.timout.ms:設定為6s
  • heartbeat.interval.ms:設定2s
  • max.poll.interval.ms:推薦為消費者處理訊息最長耗時再加1分鐘