讀《Kafka權威指南》04 深入Kafka
叢集成員關係
Kafka 使用 Zookeeper 來維護叢集成員的資訊。每個 broker 都有一個唯一識別符號,這個識別符號可以在配置檔案裡指定,也可以自動生成。在 broker 啟動的時候,它通過建立臨時節點把自己的 ID 註冊到 Zookeeper。Kafka 元件訂閱 Zookeeper 的 /broker/ids 路徑。
控制器
控制器其實就是一個 broker。叢集裡第一個啟動的 broker 通過在 Zookeeper 裡建立一個臨時節點 /controller 讓自己成為控制器。其他 broker 在控制器節點上建立 Zookeeper watch 物件,如果控制器被關閉或者與 Zookeeper 斷開連線,它們會嘗試讓自己成為新的控制器。每個新選出的控制器通過 Zookeeper 的條件遞增操作獲得一個全新的、數值更大的 controller epoch。
控制器遍歷分割槽,並確定誰應該成為新首領,隨後,新首領開始處理生產者和消費者的請求,而跟隨者開始從首領那裡複製訊息。
簡而言之,Kafka 使用 Zookeeper 的臨時節點來選舉控制器,並在節點加入叢集或退出叢集時通知控制器。控制器負責在節點加入或離開叢集時進行分割槽首領選舉。控制器使用 epoch 來避免“腦裂”,“腦裂”是指兩個節點同時被認為自己是當前的控制器。
複製
複製功能是 Kafka 架構的核心。Kafka 把自己描述成一個分散式的、可分割槽的、可複製的提交日誌服務。
Kafka 使用主題來組織資料,每個主題被分為若干個分割槽,每個分割槽有多個副本。每個 broker 可以儲存成百上千個屬於不同主題和分割槽的副本。
副本有兩種型別:
-
首領副本:每個分割槽都有一個首領副本。為了保證一致性,所有生產者請求和消費者請求都會經過這個副本。
-
跟隨者副本:首領以外的副本都是跟隨者副本。跟隨者副本不處理來自客戶端的請求,它們唯一的任務就是從首領那麼複製訊息,保持與首領一致的狀態。
為了與首領保持同步,跟隨者向首領傳送獲取資料的請求,這種請求與消費者為了讀取訊息而傳送的請求是一樣的。請求訊息裡包含了跟隨者想要獲取訊息的偏移量,而這些偏移量總是有序的。通過檢視每個跟隨者請求的最新偏移量,首領就會知道每個跟隨者複製的進度。如果跟隨者在10s內沒有請求任何訊息,或者雖然在請求訊息,但是10s內沒有請求最新的資料,那麼它就被認為是不同步的。
處理請求
broker 的大部分工作是處理客戶端、分割槽副本和控制器傳送給分割槽首領的請求。Kafka 提供了一個二進位制協議(基於 TCP),指定了請求訊息的格式以及 broker 如何對請求作出響應。
broker 會在它所監聽的每一個埠上執行一個 Acceptor 執行緒,這個執行緒會建立一個連線,並把它交給 Processor 執行緒去處理。Processor 執行緒負責從客戶端獲取請求訊息,把它們放進請求佇列,然後從響應佇列獲取響應訊息,把它們傳送給客戶端。
生產請求和獲取請求都必須傳送給分割槽的首領副本。Kafka 客戶端負責把生產請求和獲取請求傳送到爭取的 broker 上。客戶端使用了另一種請求型別,也就是元資料請求。伺服器端的響應訊息裡指明瞭這些主題所包含的分割槽、每個分割槽都有哪些副本,以及哪個副本是首領。一般情況下,客戶端會把這些資訊快取起來。
Kafka 使用零複製技術向客戶端傳送訊息 —— 也就是說,Kafka 直接把訊息從檔案(或者更確切的說是 Linux 檔案系統快取)裡傳送到網路通道,而不需要經過任何中間緩衝區。
並不是所有儲存在分割槽首領上的資料都可以被客戶端讀取。大部分客戶端只能讀取已經被寫入所有同步部分的訊息。
物理儲存
Kafka 的基本儲存單元是分割槽。在配置 Kafka 的時候,管理員指定了一個使用者儲存分割槽的目錄清單。
分割槽分配
在建立主題時,Kafka 會決定如何在 broker 間分配分割槽。為分割槽和副本選好 broker 之後,會決定哪些分割槽使用哪些目錄。規則很簡單:計算每個目錄裡的分割槽數量,新的分割槽總是被新增到數量最小的哪個目錄裡。
Kafka 管理員為每個主題配置了資料保留期限,規定資料被刪除之前可以保留多長時間,或者清理資料之前可以保留資料量大小。
檔案管理
因為一個大檔案裡查詢和刪除訊息是很費時的,所以把分割槽分成若干個片段。在 broker 往分割槽寫入資料時,如果達到片段上限,就關閉當前檔案,並開啟一個新檔案。當前正在寫入資料的片段叫做活躍片段。
檔案格式
Kafka 把訊息和偏移量儲存在檔案裡。
索引
消費者可以從 Kafka 的任意可用偏移量位置開始讀取訊息。Kafka 為每個分割槽維護了一個索引。索引把偏移量對映到片段檔案和偏移量在檔案裡的位置。
索引也被分成片段,所以再刪除訊息時,也可以刪除相應的所以。如果索引出現損壞,Kafka 會通過重新讀取訊息並錄製偏移量和位置來重新生成索引。
清理的工作原理
每個日誌片段可以分為兩個部分:
-
乾淨的部分:這些訊息之前被清理過。
-
汙濁的部分:這些訊息是上一次清理之後寫入的。
如果在 Kafka 啟動時啟動了清理功能,每個 broker 會啟動一個清理管理器執行緒和多個清理執行緒,它們負責清理任務。為了清理分割槽,清理執行緒會讀取分割槽的汙濁部分,並在記憶體裡建立一個 map。map 裡的每個元素包含了訊息鍵的雜湊值和訊息的偏移量。