1. 程式人生 > >Kafka技術內幕:消費者(高階和低階API)和 協調者

Kafka技術內幕:消費者(高階和低階API)和 協調者

生產者傳送訊息時在客戶端就按照節點和Partition進行分組,屬於同一個目標節點的多個Partition會作為同一個請求傳送到服務端,作為目標節點的服務端也可以處理來自不同生產者客戶端的請求。如果從網路層通訊來看,客戶端和服務端都會使用佇列的方式確保順序地客戶端傳送請求,服務端接收請求,服務端傳送響應,客戶端接收響應。從儲存層來看,生產者會將訊息分發到不同節點的不同Partition上,服務端的一個Partition的資料會來源於多個生產者。多個服務端節點組成的Kafka叢集在物理層將訊息分佈在不同節點的不同Partition上,並且是以提交日誌的形式追加到每個Partition中。對訊息進行分割槽的好處是可以將大量的訊息分成多批資料同時寫到不同節點上,將寫請求分擔負載到各個節點。

訊息系統的組成是生產者,儲存系統和消費者,消費者會從儲存系統讀取生產者寫入的訊息。Kafka作為分散式的訊息系統支援多個生產者和多個消費者,生產者可以將訊息分佈到叢集中不同節點的不同Partition上,消費者也可以消費叢集中多個Partition的多個Partition。寫訊息時允許多個生產者寫到同一個Partition中,不過如果讀訊息時有多個消費者要同時讀取同一個Partition,就需要在Partition級別的日誌檔案上控制確保將日誌檔案的不同資料分配給不同的消費者(不應該將同一份資料分配給兩個相同的消費者,否則同一條訊息就被重複處理了,雖然Kafka本身在消費者出現故障時可能會重複處理訊息,但是如果在正常消費時就開始重複處理,這條路顯然走不通),這種控制手段通常採用加鎖同步嚴重影響效能的方式,所以如果我們約定同一個Partition只允許被一個消費者處理就不需要加鎖同步了,不存在併發訪問了,可以大大提升消費者的處理能力,而且也並不違反訊息的處理語義:原先需要多個消費者處理,現在交給一個消費者處理也不是不可以,只要有消費者處理訊息就可以了。

圖4-1舉例了一種最簡單的訊息系統部署模式,生產者的資料來源多種多樣,它們都統一寫入到Kafka叢集中,處理訊息時有多個消費者進行任務分擔,這些消費者的處理邏輯都是相同的,每個消費者處理的Partition都是不會重複的。

圖4-1 訊息系統包括生產者、消費者和儲存系統

不過實際應用中訊息通常存在多種處理方式,將圖4-1中的多個消費者放到同一個消費組中,不同的消費組都可以有數量不同的消費者,比如可以根據實際情況對業務邏輯比較重要的消費組分配更多的消費者資源。圖4-2示例了將訊息系統作為資料處理系統的核心,消費組1將訊息儲存到Hadoop供離線分析,消費組3將訊息儲存到搜尋引擎中,消費組2讀取出訊息時使用Storm/Spark等流處理系統進行實時分析。

圖4-2 不同消費組消費同一份訊息

Kafka採用消費組保證了:一個Partition只允許被一個消費組中的一個消費者所消費,得出的結論是:在一個消費組中,一個消費者可以消費多個Partition,不同的消費者消費的Partition一定不會重複,所有消費者一起消費所有的Partition;在不同消費組中,每個消費組都會消費所有的Partition。也就是同一個消費組下消費者對Partition是互斥的,而不同消費組之間是共享的。比如有兩個消費者訂閱了一個topic,如果這兩個消費者在不同的消費組中,則每個消費者都會獲取到這個topic所有的記錄;如果這兩個消費者是在同一個消費組中,則它們會各自獲取到一半的記錄(兩者的記錄是對半分的,而且都是不重複的)。圖4-3示例了多個消費者都在同一個消費組中(右圖)或者各自組成一個消費組(左圖)的不同消費場景,這樣Kafka也可以實現傳統訊息佇列的釋出訂閱模型和佇列模型:

  1. 同一條訊息會被多個消費組消費,如果有多個消費組,每個消費組只有一個消費者,實現廣播(釋出訂閱模式)

  2. 只有一個消費組,這個消費組有多個消費者,一條訊息只會被這個消費組的一個消費者所消費,實現單播(佇列模式)

圖4-3 傳統訊息佇列的釋出訂閱模型和佇列模型

實際應用中如圖4-4消費者和消費組的組成通常是有多個消費組,並且每個消費組中也有多個消費組,這樣既可以允許多種不同業務邏輯的消費組存在,也保證了同一個消費組內的多個消費者的協調工作,避免一個消費組只有一個消費者引起的資料丟失。

圖4-4 Kafka叢集的典型部署方式
圖片引自:http://kafka.apache.org/documentation.html

Kafka使用消費組的概念,允許一組消費者程序對消費和讀取記錄的工作進行劃分,每個消費者都可以配置一個所屬的消費組並且訂閱某些主題,Kafka會發送每條訊息給每個消費組中的一個消費者執行緒(同一條訊息廣播給多個消費組,單播給同一組中的消費者),這是通過對每個消費組的所有消費者執行緒將訂閱topic的所有partitions進行平衡負載rebalance,簡單點說就是將topic的所有Partition平均負載給消費組中的所有消費者。比如一個topic有4個Partition,一個消費組有2個消費者,則每個消費者都會分配到兩個Partition。

一個消費組有多個消費者,因此消費組需要維護所有的消費者,如果一個消費者當掉了,分配給這個消費者的Partition需要被重新分配給相同組的其他消費者;如果一個消費者加入了同一個組,之前分配給其他消費組的Partition需要分配給新加入的消費者。實際上一旦有消費者加入或退出消費組,導致消費組成員列表發生變化,即使Kafka叢集的Partition沒有變化,消費組中所有的消費者也都要觸發重新rebalance的工作。當然如果叢集的Partition發生變化,即使消費組成員沒有變化,所有的消費者也都要重新rebalance。圖4-5中模擬了加入一個新的消費者,導致Partition的分配發生變化從而觸發所有消費者都發生了rebalance。

圖4-5 消費組成員變化引起所有消費者發生rebalance

消費組中的所有消費者發生rebalance時,消費者在rebalance前後分配到的Partition會完全不同,那麼消費者們之間是如何確保各自消費的訊息平滑遷移和過渡,假設Partition1原先分配給消費者1,經過rebalance後被分配給了消費者2,在rebalalance前消費者1對Partition1的消費進度需要被儲存下來,這樣在rebalance後,消費者1可以從儲存的進度位置繼續讀取Partition1,確保了Partition1不管分配給哪個消費者,訊息並不會被重複處理。

由於消費者消費訊息的最小單元是Partition,所以每個Partition都應該記錄消費進度,而且這種資料應該面向消費組級別。假設面向的是消費者級別,relabalce前Partition1只記錄到消費者1中,rebalance後Partition1屬於消費者2,但是Partition1和消費者2之前沒有記錄任何資訊就無法做到無縫遷移。而如果針對消費組,因為消費者1和消費者2都屬於同一個消費組,rebalance前記錄Partition1到消費組1,rebalance後消費者2可以正常地讀取消費組1的Partition1進度,還是可以準確地還原出這個Partition在消費組1中的最新進度。儲存Partition的消費進度通常藉助外部的儲存系統比如ZooKeeper或者Kafka內部的topic。這樣發生reabalance前後Partition的不同擁有者因為讀取的是同一份共享儲存,消費者成員的變化並不會影響訊息的消費和處理。

所以雖然Partition是以消費者級別被消費的,不過Partition的消費進度要儲存成消費組級別。消費組雖然是一個包含所有消費者的邏輯概念,它並不執行具體的訊息消費邏輯,但是它卻把大家都統一起來,如果沒有這一層總管,各個消費者之間持有各自的Partition消費進度,但是又不互相認識,在Partition發生變動時,進度訊息就沒有辦法同步給其他消費者。舉例現實社會在協作分工時通常都有一個管理員角色(消費組)負責管理所有的工人(消費者),任務(Partition)具體分配給哪些工人都是由管理員決定的。如果工人數量發生變化比如有人加入或離職,或者任務增加或減少,每個工人都會被重新分配到不同的任務。圖4-6中消費者消費訊息時需要定時地將最新的消費進度儲存到ZooKeeper中,當發生rebalance時,新的消費者擁有的新的Partition都可以從ZooKeeper中讀取出來恢復到最近的狀態。

圖4-6 消費進度的儲存和恢復

負責消費Partition的每個消費者都是一個消費程序,而且消費者本身也可以是多執行緒的應用程式,因為一個Partition只能屬於一個消費者執行緒,所以存在如下幾種不同的場景:

  1. 執行緒數量多於Partition的數量,有部分執行緒無法消費該topic下任何一條訊息

  2. 執行緒數量少於Partition的數量,有一些執行緒會消費多個Partition的資料

  3. 執行緒數量等於Partition的數量,則正好一個執行緒消費一個Partition的資料

圖4-7分別對應了上面的三種場景,正常情況下采用第二種是最好的,這種方案既不會有第一種的資源浪費想象存在,而且也不會像第三種那樣每個執行緒只負責一點點工作,通過讓一個執行緒消費多個Partition,最大化地榨取每個執行緒的勞動能力。舉例幼兒園的老師將一個蛋糕分成了四塊,如果剛好有四個小朋友則每個小朋友都只能分到一塊(但是每個人一塊可能都吃不飽);如果有五個小朋友,那麼有一個小朋友就要眼睜睜地看大家吃蛋糕了(那些分到蛋糕的小朋友很慶幸至少有蛋糕吃);如果有兩個小朋友,那他們就可開心了,因為這兩個小朋友都能吃到兩份蛋糕(任務的資源比消費者多,每個消費者分到不止一個資源,這是最好的情況)。

圖4-7 消費者執行緒和Partition的對應關係

雖然允許一個消費者執行緒消費多個Partition,但並不保證消費者接收到的訊息是完全有序的,不過消費同一個Partition的訊息則一定是有序的。圖4-8的左圖示例了消費者分配了Partition0和Partition1,有可能生產端寫入不同Partition的訊息速度不同,也有可能不同消費者執行緒之間的消費速度不同,到達消費者客戶端的訊息可能是Partition0和Partition1的訊息混雜在一起的,不過如果單單從Partition0或Partition1而言,日誌檔案中是什麼順序,接收到的也一定是同樣的順序,比如P0的①②③雖然和P1的①②③魚龍混雜,但並不會出現到達客戶端後P0的①②③變成了其他順序。

不過即使消費者每次讀取的是一個完整的Partition(實際上是不可能的,因為生產者不斷地往不同的Partition寫資料,消費者要消費多個Partition,怎麼判定完整地讀取了一個Partition呢),由於生產者寫訊息時也將訊息分散到多個Partition,輸入源這邊雖然保證了Partition級別的訊息有序性,但是所有Partition之間並不是有序的,這就導致了圖4-8右圖中消費者讀取多個Partition時從所有Partition級別上看訊息也不是嚴格有序的。

圖4-8 消費者讀取不同Partition訊息的順序性

生產者的提交日誌採用遞增的offset連同訊息內容一起寫入到本地日誌檔案,生產者客戶端本身不需要儲存offset相關的狀態,而消費者程序則要儲存消費訊息的offset,因此它是有狀態的,這樣消費者才能將訊息的消費進度儲存到ZK或者其他儲存系統中。在消費者客戶端程序儲存offset狀態的另一種決定是消費訊息採用消費者主動向服務端pull拉取資料,而不是服務端主動向消費者push資料。如果由服務端push推送資料給消費者,消費者只要負責接收資料就可以了,不需要儲存任何狀態,但是這種方式加重了服務端的負載,因為要在服務端記錄每條訊息要分配給哪個消費者,還要記錄消費者消費到哪裡了。消費進度是決定訊息是否會被重複處理的關鍵因素,如果沒有記錄進度,消費者讀取到哪裡就一無所知了。

圖4-9中左圖服務端主動push訊息給消費者就要在服務端記錄push給消費者的進度,右圖中消費者主動pull就在消費者端記錄拉取進度,誰掌握了主動權,誰就要負責儲存offset。消費者的pull還需要額外依賴外部的ZK,因為每個消費者都是獨立的個體,如果要獲取所有消費者的消費進度,就要向各個消費者輪詢,而使用一個統一的外部儲存,每個消費者都往儲存系統寫資料,讀取時只需要和儲存系統打交道即可,不過這種方式需要保證消費者將最新的消費進度即使地寫到儲存系統中,如果沒有及時寫入就有可能讀取出舊的消費進度了。

圖4-9 訊息的push和pull模型

服務端主動push並不需要外部儲存是因為服務端本身可以充當管理所有的消費者的角色,但是這種方式的缺點是push只保證把訊息推送出去,並沒有考慮消費者是否能夠及時地處理訊息,如果消費者處理不夠及時,服務端是否能夠感知到並且做出正確的響應比如採用ack機制或者backpressure背壓,這種方式實現起來總的來說比較複雜而且在服務端儲存所有消費者的消費進度也佔用一定的記憶體。而如果是消費者客戶端主動pull,消費者可以按照自己的消費能力消費訊息,正所謂能者多勞,效能強的自然消費的快點多點,效能差的消費的慢點少點也是可以接受的。

消費者客戶端主動pull並且記錄offset狀態實際上還有諸多好處,因為消費者可以自己控制offset,如果業務需要,它可以回退到某個offset重新處理訊息,或者訊息一下子太多處理不過來而又不想處理,可以前進到最近的offset那裡繼續開始消費。而如果在服務端記錄消費者的offset,這一切都無從談起,因為服務端無法做這種特殊的定製,即使加入了這樣的自定義邏輯,服務端的實現也會非常複雜。綜合上面這些因素,Kafka的消費者實現採用更高效更具擴充套件性的push模式消費訊息。

不過有時候應用程式從Kafka讀取資料,並不太關心訊息offset的處理,所以Kafka提供了兩種層次的客戶端API:1)Hight Level Consumer高階API提供了一個從Kafka消費資料的高層抽象,消費者客戶端程式碼不需要管理offset的提交,並且採用了消費組的自動負載均衡功能,確保消費者的增減不會影響訊息的消費;2)Low Level Consumer低階API通常針對特殊的消費邏輯(比如客消費者只想要消費某些特定的Partition),低階API的客戶端程式碼需要自己實現一些和Kafka服務端相關的底層邏輯,比如選擇Partition的Leader,處理Leader的故障轉移等。

表4-1中高階API主要使用了ConsumerGroup語義實現消費者的自動負責均衡,低階API主要針對SimpleConsumer,不過選舉Leader,拉取訊息這些都要自己去實現,實際應用中高階API雖然功能簡單但是用的還是比較多,畢竟越簡單的東西越不容易出問題。實際上高階API也會使用SimpleConsumer類完成訊息的拉取,不過其他的複雜工作都被封裝起來,對客戶端程式碼而言是透明的。

表4-1 消費者客戶端的高階API和低階API

上面我們從消費者談到消費組、消費者執行緒和Partition的關係、offset的外部儲存和push模式,有了這些基礎知識的鋪墊後,讀者最好帶著下面這些問題思考Kafka的消費者是如何實現的:

  1. 消費組管理所有消費者,消費者領取消費組分配的任務是通過讀取ZK完成的,消費者註冊ZK監聽器並觸發rebalance操作

  2. 消費者執行緒拉取Partition資料,一個消費者程序允許有多個執行緒,客戶端如何管理多個執行緒的訊息拉取

  3. 消費者拉取到訊息後,offset定時提交到ZK,那麼什麼時候會讀取offset:發生rebalance後[1],拉取訊息之前[2]

消費組狀態機

我們先假設初始時世界是混沌的還沒有盤古的開天闢地,協調者也是一片荒蕪人煙之地,沒有儲存任何狀態,因為消費組的初始狀態是Stable,在第一次的Rebalance時,正常的還沒有向消費組註冊過的消費者會執行狀態為Stable而且memberId=UNKNOWN_MEMBER_ID條件分支。在第一次Rebalance之後,每個消費者都分配到了一個成員編號,系統又會進入Stable穩定狀態(Stable穩定狀態包括兩種:一種是沒有任何消費者的穩定狀態,一種是有消費者的穩定狀態)。因為所有消費者在執行一次JoinGroup後並不是說系統就一直保持這種不變的狀態,有可能因為這樣或那樣的事件導致消費者要重新進行JoinGroup,這個時候因為之前JoinGroup過了每個消費者都是有成員編號的,處理方式肯定是不一樣的。

所以定義一種事件驅動的狀態機就很有必要了,這世界看起來是雜亂無章的,不過只要遵循著狀態機的規則(萬物生長的理論),任何事件都是有跡可循有路可走有條不紊地進行著。

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,

    clientHost: String,sessionTimeoutMs: Int,protocolType: String,

    protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {

  if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {

    //protocolType對於消費者是consumer,注意這裡的協議型別和PartitionAssignor協議不同哦

    //協議型別目前總共就兩種消費者和Worker,而協議是PartitionAssignor分配演算法

    responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))

  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {

    //如果當前組沒有記錄該消費者,而該消費者卻被分配了成員編號,則重置為未知成員,並讓消費者重試

    responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))

  } else { group.currentState match {

    case Dead =>

      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))

    case PreparingRebalance =>

      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個消費者在這裡了!

        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 

          protocols, group, responseCallback)

      } else {

        val member = group.get(memberId)

        updateMemberAndRebalance(group, member, protocols, responseCallback)

      }

    case Stable =>

      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {  //1.初始時第一個消費者在這裡!

        //如果消費者成員編號是未知的,則向GroupMetadata註冊並被記錄下來

        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 

          protocols, group, responseCallback)

      } else { //3.第二次Rebalance時第一個消費者在這裡,此時要分Leader還是普通的消費者了

        val member = group.get(memberId)

        if (memberId == group.leaderId || !member.matches(protocols)) {

          updateMemberAndRebalance(group, member, protocols, responseCallback)

        } else {

          responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,

            generationId = group.generationId,subProtocol = group.protocol,

            leaderId = group.leaderId,errorCode = Errors.NONE.code))

        }

      }

    }

    if (group.is(PreparingRebalance))

      joinPurgatory.checkAndComplete(GroupKey(group.groupId))

  }

}

addMemberAndRebalance和updateMemberAndRebalance會建立或更新MemberMetadata,並且會嘗試呼叫prepareRebalance,消費組中只有一個消費者有機會呼叫prepareRebalance,並且一旦呼叫該方法,會將消費組狀態更改為PreparingRebalance,就會使得下一個消費者只能從case PreparingRebalance入口進去了,假設第一個消費者是從Stable進入的,它更改了狀態為PreparingRebalance,下一個消費者就不會從Stable進來的。不過進入Stable狀態還要判斷消費者是不是已經有了成員編號,通常是之前已經發生了Rebalance,這種影響也是比較巨大的,每個消費者走的路徑跟第一次的Rebalance是完全不同的迷宮地圖了。

1)第一次Rebalance如圖6-18的上半部分:

  1. 第一個消費者,狀態為Stable,沒有編號,addMemberAndRebalance,成為Leader,執行prepareRebalance,更改狀態為PreparingRebalance,建立DelayedJoin

  2. 第二個消費者,狀態為PreparingRebalance,沒有編號,addMemberAndRebalance(不執行prepareRebalance,因為在狀態改變成PreparingRebalance後就不會被執行了);後面的消費者同第二個

  3. 所有消費者都要等協調者收集完所有成員編號在DelayedJoin完成時才會收到JoinGroup響應

圖6-18 第一次和第二次Rebalance

2)第二次Rebalance,對於之前加入過的消費者都要成員編號如圖6-18的下半部分:

  1. 第一個消費者是Leader,狀態為Stable,有編號,updateMemberAndRebalance,更改狀態為PreparingRebalance,建立DelayedJoin

  2. 第二個消費者,狀態為PreparingRebalance,有編號,updateMemberAndRebalance;後面的消費者同第二個

  3. 所有消費者也要等待,因為其他消費者傳送Join請求在Leader消費者之後。

3)不過如果有消費者在Leader之前傳送又有點不一樣了如圖6-19:

  1. 第一個消費者不是Leader,狀態為Stable,有編號,responseCallback,立即收到JoinGroup響應,好幸運啊!

  2. 第二個消費者如果也不是Leader,恭喜你,協調者也放過他,直接返回JoinGroup響應

  3. 第三個消費者是Leader(領導來了),狀態為Stable(什麼,你們之前的消費者竟然都沒更新狀態!,因為他們都沒有add或update),有編號,updateMemberAndRebalance(還是我第一個呼叫add或update,看來還是隻能我來更新狀態),更改狀態為PreparingRebalance,建立DelayedJoin

  4. 第四個消費者不是Leader,狀態為PreparingRebalance,有編號,updateMemberAndRebalance(前面有領導,不好意思了,不能立即返回JoinGroup給你了,你們這些剩下的消費者都只能和領導一起返回了,算你們倒黴)

圖6-19 Leader非第一個傳送JoinGroup請求

4)如果第一個消費者不是Leader,也沒有編號,說明這是一個新增的消費者,流程又不同了如圖6-20:

  1. 第一個消費者不是Leader,狀態為Stable,沒有編號,addMemberAndRebalance,執行prepareRebalance(我是第一個呼叫add或update的哦,你們都別想跟我搶這個頭彩了),更改狀態為PreparingRebalance(我不是Leader但我驕傲啊),建立DelayedJoin(我搶到頭彩,當然建立DelayedJoin的工作只能由我來完成了)

  2. 第二個消費者也不是Leader,恭喜你,協調者也放過他,直接返回JoinGroup響應

  3. 第三個消費者是Leader(領導來了),狀態為PreparingRebalance(有個新來的不懂規矩,他已經把狀態改了),有編號,updateMemberAndRebalance(有人已經改了,你老就不用費心思了),凡是沒有立即返回響應的,都需要等待,領導也不例外

  4. 第四個消費者不是Leader(廢話,只有一個領導,而且領導已經在前面了),不會立即返回響應(你看領導都排隊呢)

  5. 雖然DelayedJoin是由沒有編號的消費者建立,不過由於DelayedJoin是以消費組為級別的,所以不用擔心,上一次選舉出來的領導還是領導,協調者最終還是會把members交給領導,不會是給那個沒有編號的消費者的,雖然說在他註冊的時候已經有編號了,但是大家不認啊。不過領導其實不在意是誰開始觸發prepareRebalance的,那個人要負責生成DelayedJoin,而不管是領導自己還是其他人一旦更改狀態為PreparingRebalance,後面的消費者都要等待DelayedJoin完成了,而領導者總是要等待的,所以他當然無所謂了,因為他知道最後協調者總是會把members交給他的。


圖6-20 新增消費組第一個傳送JoinGroup請求

根據上面的幾種場景總結下來狀態機的規則和一些結論如下:

  1. 第一個呼叫addMemberAndRebalance或者updateMemberAndRebalance的會將狀態改為PreparingRebalance,並且負責生成DelayedJoin

  2. 一旦狀態進入PreparingRebalance,其他消費者就只能從PreparingRebalance狀態入口進入,這裡只有兩種選擇addMemberAndRebalance或者updateMemberAndRebalance,不過他們不會更改狀態,也不會生成DelayedJoin

  3. 發生DelayedJoin之後,其他消費者的JoinGroup響應都會被延遲,因為如規則2中,他們只能呼叫add或update,無法立即呼叫responseCallback,所以就要和DelayedJoin的那個消費者一起等待

  4. 正常流程時,發生responseCallback的是存在成員編號的消費者在Leader之前傳送了JoinGroup,或者新增加的消費者傳送了JoinGroup請求之前

  5. 第一次Rebalance時,第一個消費者會建立DelayedJoin,之後的Rebalance,只有新增的消費者才有機會建立(如果他在Leader之前傳送的話,如果在Leader之後就沒有機會了),而普通消費者總是沒有機會建立DelayedJoin的,因為狀態為Stable時,他會直接開溜,有人(Leader或者新增加的消費者)建立了DelayedJoin之後,他又在那邊怨天尤人只能等待

轉載http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/#%E7%AC%AC%E4%BA%8C%E7%AB%A0_%E7%94%9F%E4%BA%A7%E8%80%85