1. 程式人生 > >Kafka 和 ZooKeeper 的分散式訊息佇列

Kafka 和 ZooKeeper 的分散式訊息佇列

文章出處:https://blog.csdn.net/valada/article/details/80892612

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構,是大型分散式系統不可缺少的中介軟體。

訊息佇列在電商系統、訊息通訊、日誌收集等應用中扮演著關鍵作用,以阿里為例,其研發的訊息佇列(MQ)服務於阿里集團超過11年,在歷次天貓雙十一活動中支撐了萬億級的資料洪峰,為大規模交易提供了有力保障。

目前在生產環境,使用較多的訊息佇列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 等。本場 Chat 將介紹基於 Kafka 和 ZooKeeper 的分散式訊息佇列。

本場 Chat,您將清楚以下問題:

  1. Kafka,Zookeeper 是什麼?
  2. 基於 Kafka 和 ZooKeeper 的分散式訊息佇列架構是怎樣的?
  3. Kafka 為什麼要將 Topic 進行分割槽?
  4. 分散式訊息佇列中 Zookeeper 以怎樣的形式存在,起什麼作用?
  5. 訊息佇列釋出-訂閱全流程是怎樣的?

特別說明:本場Chat僅僅作為分享,不足之處,還請讀者包容,謝謝

1. 前言

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構,是大型分散式系統不可缺少的中介軟體。

訊息佇列在電商系統、訊息通訊、日誌收集等應用中扮演著關鍵作用,以阿里為例,其研發的訊息佇列 (MQ) 服務於阿里集團超過 11 年,在歷次天貓雙十一活動中支撐了萬億級的資料洪峰,為大規模交易提供了有力保障。

目前在生產環境,使用較多的訊息佇列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ 等。本場 Chat 將介紹基於 Kafka 和 ZooKeeper 的分散式訊息佇列 (Distributed Message Queue,下文簡稱 DMQ)。

如果你對訊息佇列的基本概念還不清楚的話,在正式開始之前,建議你看一組例項來直觀的感受一下訊息佇列:

點選檢視

2. 基於 Kafka 的 DMQ 總體架構

2.1 什麼是 Kafka?

Kafka 是最初由 Linkedin 公司開發,是一個分散式、支援分割槽的(partition)、多副本的(replica),基於 zookeeper 協調的分散式訊息系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於 hadoop 的批處理系統、低延遲的實時系統、storm/Spark 流式處理引擎,web/nginx 日誌、訪問日誌,訊息服務等等,用 scala 語言編寫,Linkedin 於 2010 年貢獻給了 Apache 基金會併成為頂級開源專案。 目前,Kafka 官網已經將自己修正為一個分散式的流式處理平臺。本場 Chat 介紹的 DMQ 中,kafka 作為高吞吐量的分散式釋出訂閱訊息系統。

Kafka 的特性:

  • 高吞吐量、低延遲:kafka 每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒,每個 topic 可以分多個 partition, consumer group 對 partition 進行 consume 操作;
  • 可擴充套件性:kafka 叢集支援熱擴充套件;
  • 永續性、可靠性:訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失;
  • 容錯性:允許叢集中節點失敗(若副本數量為 n, 則允許 n-1 個節點失敗);
  • 高併發:支援數千個客戶端同時讀寫;

kafka 以叢集方式執行,producers 通過網路將訊息傳送到 kafka 叢集,叢集向消費者提供訊息(需要說明的是,訊息是由消費者主動消費的)。如下圖:

enter image description here

上文已經提到了分割槽 (partition),建立一個 topic 時,同時可以指定分割槽的數目,分割槽數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka 在接收到生產者傳送的訊息之後,會根據均衡策略將訊息儲存到不同的分割槽中,如下圖:

enter image description here

2.2 DMQ 總體架構是怎樣的?

基於 kafka 的 DMQ 總體架構如下

enter image description here

DMQ 是基於 Kafka 架構建立的,在 Kafka 架構中,有幾個術語:

  1. Producer:生產者,即訊息傳送者,push 訊息到 Kafka 叢集中的 broker(就是 server)中。
  2. Broker:Kafka 叢集由多個 Kafka 例項 (Server) 組成,每個例項構成一個 broker,說白了就是伺服器。
  3. Topic:producer 向 kafka 叢集 push 的訊息會被歸於某一類別,即 Topic,這本質上只是一個邏輯概念,面向的物件是 producer 和 consumer,producer 只需要關係將訊息 push 到哪一個 Topic 中,而 consumer 只需要關心自己訂閱了哪個 Topic。
  4. Partition:每一個 Topic 又被分為多個 Partitions,即物理分割槽;出於負載均衡的考慮,同一個 Topic 的 Partitions 分別儲存於 Kafka 叢集的多個 broker 上;而為了提高可靠性,這些 Partitions 可以由 Kafka 機制中的 replicas 來設定備份的數量;如上面的框架圖所示,存在兩個備份。
  5. Consumer:消費者,從 Kafka 叢集的 broker 中 pull 訊息、消費訊息。
  6. Consumer group:high-level consumer API 中,每個 consumer 都屬於一個 consumer group,每條訊息只能被 consumer group 中的一個 Consumer 消費,但可以被多個 consumer group 消費。
  7. replicas: partition 的副本,保障 partition 的高可用。
  8. leader:replicas 中的一個角色, producer 和 consumer 只跟 leader 互動。
  9. follower:replicas 中的一個角色,從 leader 中複製資料。
  10. controller:kafka 叢集中的其中一個伺服器,用來進行 leader election 以及 各種 failover。
  11. Zookeeper:kafka 通過 zookeeper 來儲存叢集的 meta 資訊。

注:kafka 叢集依賴於 Zookeeper 架構,藉助其優越的一致性、可靠性、實時性、原子性以及順序性來保障集群系統的可用性。早期版本的 kafka 用 Zookeeper 做 meta 資訊儲存:包括 consumerGroup/consumer、broker、Topic 等。鑑於 Zookeeper 本身的一些固有缺陷,新版本 Kafka 中弱化了對 zookeeper 的依賴,consumer 使用了 kafka 內部的 group coordination 協議。關於 Zookeeper,下文將詳細介紹。

3. 關於上述概念的理解

3.1 Kafka 為什麼要將 Topic 進行分割槽?

簡而言之:負載均衡 + 水平擴充套件。

前已述及,Topic 只是邏輯概念,面向的是 producer 和 consumer;而 Partition 則是物理概念。可以想象,如果 Topic 不進行分割槽,而將 Topic 內的訊息儲存於一個 broker,那麼,這個 broker 將會壓力過大而使得吞吐量陷入瓶頸,這顯然是不符合高吞吐量應用場景的。有了 Partition 概念以後,假設一個 Topic 被分為 10 個 Partitions,Kafka 會根據一定的演算法將 10 個 Partition 儘可能均勻的分佈到不同的 broker(伺服器)上,當 producer 釋出訊息時,producer 客戶端可以採用 “random”、“key-hash” 及 “輪詢” 等演算法選定目標 partition,若不指定,Kafka 也將根據一定演算法將其置於某一分割槽上。Partiton 機制可以極大的提高吞吐量,並且使得系統具備良好的水平擴充套件能力。補充一點:Kafka 機制中,producer push 來的訊息是追加(append)到 partition 中的,這是一種順序寫磁碟的機制,效率遠高於隨機寫記憶體,況且,還有多個分割槽的存在,這種機制保障了 Kafka 的高吞吐率。

3.2 什麼是 Zookeeper?

ZooKeeper,字面意為 “動物園管理員”。動物園裡有各種動物,為了讓不同種類的動物 “和諧” 相處,為遊客提供良好的觀賞服務,就需要動物園管理員按照動物的習性加以分類和管理。 在企業級應用中,通常各子系統不是孤立存在的,它們彼此之間需要協作和互動,即所謂的分散式系統。各個子系統就好比動物園裡的動物,為了使各個子系統能正常為使用者提供統一的服務,必需一種機制來進行協調——ZooKeeper。

如下圖所示為 Zookeeper 架構:

enter image description here

Zookeeper 叢集主要角色有 Leader,Learner(Follower,Observer(當伺服器增加到一定程度,由於投票的壓力增大從而使得吞吐量降低,所以增加了 Observer。) 以及 client:

enter image description here

官方說明: ZooKeeper 是一個分散式的,開放原始碼的分散式應用程式協調服務,是 Google 的 Chubby 一個開源的實現。分散式應用程式可以基於它實現統一命名服務、狀態同步服務、叢集管理、分散式應用配置項的管理等工作。

Zookeeper 的核心是原子廣播,這個機制保證了各個 Server 之間的同步。實現這個機制的協議叫做 Zab 協議。Zab 協議有兩種模式,它們分別是恢復模式(選主)和廣播模式(同步)。當服務啟動或者在領導者崩潰後,Zab 就進入了恢復模式,當領導者被選舉出來,且大多數 Server 完成了和 leader 的狀態同步以後,恢復模式就結束了。狀態同步保證了 leader 和 Server 具有相同的系統狀態。

3.3 Zookeeper 能做什麼?

簡而言之:檔案系統 + 通知機制

1. 檔案系統Zookeeper 維護一個類似檔案系統的資料結構:

enter image description here

每個子目錄項如 NameService 都被稱作為 znode,和檔案系統一樣,我們能夠自由的增加、刪除 znode,在一個 znode 下增加、刪除子 znode,唯一的不同在於 znode 是可以儲存資料的。

znode 型別 描述
PERSISTENT 持久化目錄節點 ,客戶端與 zookeeper 斷開連線後,該節點依舊存在
PERSISTENT_SEQUENTIAL 持久化順序編號目錄節點,客戶端與 zookeeper 斷開連線後,該節點依舊存在,只是 Zookeeper 給該節點名稱進行順序編號
EPHEMERAL 臨時目錄節點,客戶端與 zookeeper 斷開連線後,該節點被刪除
EPHEMERAL_SEQUENTIAL 臨時順序編號目錄節點,客戶端與 zookeeper 斷開連線後,該節點被刪除,只是 Zookeeper 給該節點名稱進行順序編號

有四種類型的 znode:

znode 型別 描述
PERSISTENT 持久化目錄節點 ,客戶端與 zookeeper 斷開連線後,該節點依舊存在
PERSISTENT_SEQUENTIAL 持久化順序編號目錄節點,客戶端與 zookeeper 斷開連線後,該節點依舊存在,只是 Zookeeper 給該節點名稱進行順序編號
EPHEMERAL 臨時目錄節點,客戶端與 zookeeper 斷開連線後,該節點被刪除
EPHEMERAL_SEQUENTIAL 臨時順序編號目錄節點,客戶端與 zookeeper 斷開連線後,該節點被刪除,只是 Zookeeper 給該節點名稱進行順序編號

2. 通知機制客戶端註冊監聽它關心的目錄節點,當目錄節點發生變化(資料改變、被刪除、子目錄節點增加刪除)時,zookeeper 會通知客戶端。

3. 具體用途

  • 命名服務

在分散式系統中,經常需要給一個資源生成一個唯一的 ID,在沒有中心管理結點的情況下生成這個 ID 並不是一件很容易的事兒。zookeeper 就提供了這樣一個命名服務。

  • 配置管理

主要用於多個結點共享配置,並且在配置發生更新時,利用 zookeeper 可以讓這些使用了這些配置的結點獲得通知,進行重新載入等操作。

  • 叢集管理

所謂叢集管理主要有兩點:節點退出和加入、選舉 master。

  • 分散式鎖

有了 zookeeper 的一致性檔案系統,鎖的問題變得容易。鎖服務可以分為兩類,一個是保持獨佔,另一個是控制時序。

  • 其它略

3.4 Kafka 架構中 Zookeeper 以怎樣的形式存在?

1. broker 在 zookeeper 中的註冊

  • 為了記錄 broker 的註冊資訊,在 zookeeper 上,專門建立了屬於 kafka 的一個節點,其路徑為 / brokers。

  • Kafka 的每個 broker 啟動時,都會到 zookeeper 中進行註冊,告訴 zookeeper 其 broker.id, 在整個叢集中,broker.id 應該全域性唯一,並在 zookeeper 上建立其屬於自己的節點,其節點路徑為 / brokers/ids/{broker.id}.

  • 建立完節點後,kafka 會將該 broker 的 broker.name 及埠號記錄到該節點。

  • 另外,該 broker 節點屬性為臨時節點,當 broker 會話失效時,zookeeper 會刪除該節點,這樣,我們就可以很方便的監控到 broker 節點的變化,及時調整負載均衡等。

2. Topic 在 zookeeper 中的註冊在 kafka 中,所有 topic 與 broker 的對應關係都由 zookeeper 進行維護,在 zookeeper 中,建立專門的節點來記錄這些資訊,其節點路徑為 / brokers/topics/{topic_name}。前面說過,為了保障資料的可靠性,每個 Topic 的 Partitions 實際上是存在備份的,並且備份的數量由 Kafka 機制中的 replicas 來控制。那麼問題來了:如下圖所示,假設某個 TopicA 被分為 2 個 Partitions,並且存在兩個備份,由於這 2 個 Partitions(1-2)被分佈在不同的 broker 上,同一個 partiton 與其備份不能(也不應該)儲存於同一個 broker 上。以 Partition1 為例,假設它被儲存於 broker2,其對應的備份分別儲存於 broker1 和 broker4,有了備份,可靠性得到保障,但資料一致性卻是個問題。

enter image description here

為了保障資料的一致性,Zookeeper 機制得以引入。基於 Zookeeper,Kafka 為每一個 partition 找一個節點作為 Leader,其餘備份作為 Follower;接續上圖的例子,就 TopicA 的 partition1 而言,如果位於 broker2(Kafka 節點)上的 partition1 為 Leader,那麼位於 broker1 和 broker4 上面的 partition1 就充當 Follower,則有下圖:

enter image description here

基於上圖的架構,當 producer push 的訊息寫入 partition(分割槽) 時,作為 Leader 的 broker(Kafka 節點) 會將訊息寫入自己的分割槽,同時還會將此訊息複製到各個 Follower,實現同步。如果,某個 Follower 掛掉,Leader 會再找一個替代並同步訊息;如果 Leader 掛了,Follower 們會選舉出一個新的 Leader 替代,繼續業務,這些都是由 zookeeper 完成的。

3. consumer 在 zookeeper 中的註冊

  • 註冊新的消費者分組

當新的消費者組註冊到 zookeeper 中時,zookeeper 會建立專用的節點來儲存相關資訊,其節點路徑為 ls /consumers/{group_id},其節點下有三個子節點,分別為 [ids, owners, offsets]。

  1. ids 節點:記錄該消費組中當前正在消費的消費者;
  2. owners 節點:記錄該消費組消費的 topic 資訊;
  3. offsets 節點:記錄每個 topic 的每個分割槽的 offset,
  • 註冊新的消費者

當新的消費者註冊到 kafka 中時,會在 / consumers/{group_id}/ids 節點下建立臨時子節點,並記錄相關資訊

  • 監聽消費者分組中消費者的變化

每個消費者都要關注其所屬消費者組中消費者數目的變化,即監聽 / consumers/{group_id}/ids 下子節點的變化。一單發現消費者新增或減少,就會觸發消費者的負載均衡。

3.5 誰來充當 Zookeeper 的 Client?

很明顯 Zookeeper 機制在 Kafka 架構中是用來管理 Topic 的 Partition 的,而 Topic 直接關聯的是 producer 和 consumer:producer 將訊息 push 到對應 Topic 的 partition,consumer 從訂閱的 Topic 中 pull 訊息。

producer 充當了 Client:push 訊息,實質上是 “寫” 操作,Zookeeper 機制中,只有 Leader 才能執行 “寫” 操作,Leader 將訊息寫入本節點 (broker) 的對應 partition 後,再將訊息同步到各個 Follower;Consumer 的 pull 訊息,實質上就是 “讀” 操作,不過,這個 “讀” 操作並不簡單。

3.6 Kafka 的訊息 - 訂閱模式是同步 OR 非同步?

顯然是非同步的:producer 向 Kafka 叢集 push 的訊息並不是實時傳送到 consumer 的,producer push 到 Kafka 叢集 partition 中的訊息是以 “追加”(Append) 的形式順序寫入磁碟;訊息到達 consumer 端卻並不是 “推送” 的,而是 consumer 主動 pull(fetch 請求)的,consumer 可以根據自己的消費能力去 fetch 訊息並執行處理,而且可以根據 offset(偏移量)來控制消費的進度。

此外,在 Kafka 中,一個 Consumer-Group 中只有一個消費者能夠消費某個 broker partition 中的訊息,

4. 全程解析(Producer-kafka-consumer)

4.1 producer 釋出訊息

producer 採用 push 模式將訊息釋出到 broker,每條訊息都被 append 到 patition 中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障 kafka 吞吐率)。producer 傳送訊息到 broker 時,會根據分割槽演算法選擇將其儲存到哪一個 partition。

其路由機制為:

  1. 指定了 patition,則直接使用;
  2. 未指定 patition 但指定 key,通過對 key 的 value 進行 hash 選出一個 patition
  3. patition 和 key 都未指定,使用輪詢選出一個 patition。

寫入流程:

  1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
  2. producer 將訊息傳送給該 leader
  3. leader 將訊息寫入本地 log
  4. followers 從 leader pull 訊息,寫入本地 log 後 leader 傳送 ACK
  5. leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 傳送 ACK

4.2 Broker 儲存訊息

物理上把 topic 分成一個或多個 patition,每個 patition 物理上對應一個資料夾(該資料夾儲存該 patition 的所有訊息和索引檔案)

4.3 Consumer 消費訊息

high-level consumer API 提供了 consumer group 的語義,一個訊息只能被 group 內的一個 consumer 所消費,且 consumer 消費訊息時不關注 offset,最後一個 offset 由 zookeeper 儲存。

注意:

  1. 如果消費執行緒大於 patition 數量,則有些執行緒將收不到訊息;
  2. 如果 patition 數量大於執行緒數,則有些執行緒多收到多個 patition 的訊息;
  3. 如果一個執行緒消費多個 patition,則無法保證你收到的訊息的順序,而一個 patition 內的訊息是有序的。

consumer 採用 pull 模式從 broker 中讀取資料。

push 模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費訊息。

對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費訊息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

5. 補充

5.1 consumer-Group

kafka 的分配單位是 patition。每個 consumer 都屬於一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個訊息只能被 group 內的一個 consuemr 所消費),但是多個 group 可以同時消費這個 partition。

5.2 consumer rebalance

當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。具體演算法略。

5.3 controller failover

當 controller 宕機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節點註冊 watcher,當 controller 宕機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試建立新的 controller path,只有一個競選成功並當選為 controller。

5.4 broker failover

  1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點註冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch。
  2. controller 從 /brokers/ids 節點讀取可用 broker
  3. controller 決定 set_p,該集合包含宕機 broker 上的所有 partition
  4. 對 set_p 中的每一個 partition:
從 /brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR決定新 leader(如 4.3 節所描述)將新 leader、ISR、controller_epoch 和 leader_epoch 等資訊寫入 state 節點通過 RPC 向相關 broker 傳送 leaderAndISRRequest 命令
   

5.5 leader failover

當 partition 對應的 leader 宕機時,需要從 follower 中選舉出新 leader。在選舉新 leader 時,一個基本的原則是,新的 leader 必須擁有舊 leader commit 過的所有訊息。

kafka 在 zookeeper 中(/brokers/.../state)動態維護了一個 ISR(in-sync replicas),ISR 裡面的所有 replica 都跟上 leader,只有 ISR 裡面的成員才能選為 leader。

參考文獻

  1. https://www.zhihu.com/question/35139415
  2. https://blog.csdn.net/cws1214/article/details/52922267

本文首發於GitChat,未經授權不得轉載,轉載需與GitChat聯絡。

閱讀全文: http://gitbook.cn/gitchat/activity/5ad5634e1165247fd990c306

一場場看太麻煩?成為 GitChat 會員,暢享 1000+ 場 Chat !點選檢視