1. 程式人生 > >Kafka 系列(五)—— 深入理解 Kafka 副本機制

Kafka 系列(五)—— 深入理解 Kafka 副本機制

一、Kafka叢集

Kafka 使用 Zookeeper 來維護叢集成員 (brokers) 的資訊。每個 broker 都有一個唯一標識 broker.id,用於標識自己在叢集中的身份,可以在配置檔案 server.properties 中進行配置,或者由程式自動生成。下面是 Kafka brokers 叢集自動建立的過程:

  • 每一個 broker 啟動的時候,它會在 Zookeeper 的 /brokers/ids 路徑下建立一個 臨時節點,並將自己的 broker.id 寫入,從而將自身註冊到叢集;
  • 當有多個 broker 時,所有 broker 會競爭性地在 Zookeeper 上建立 /controller
    節點,由於 Zookeeper 上的節點不會重複,所以必然只會有一個 broker 建立成功,此時該 broker 稱為 controller broker。它除了具備其他 broker 的功能外,還負責管理主題分割槽及其副本的狀態。
  • 當 broker 出現宕機或者主動退出從而導致其持有的 Zookeeper 會話超時時,會觸發註冊在 Zookeeper 上的 watcher 事件,此時 Kafka 會進行相應的容錯處理;如果宕機的是 controller broker 時,還會觸發新的 controller 選舉。

二、副本機制

為了保證高可用,kafka 的分割槽是多副本的,如果一個副本丟失了,那麼還可以從其他副本中獲取分割槽資料。但是這要求對應副本的資料必須是完整的,這是 Kafka 資料一致性的基礎,所以才需要使用 controller broker

來進行專門的管理。下面將詳解介紹 Kafka 的副本機制。

2.1 分割槽和副本

Kafka 的主題被分為多個分割槽 ,分割槽是 Kafka 最基本的儲存單位。每個分割槽可以有多個副本 (可以在建立主題時使用 replication-factor 引數進行指定)。其中一個副本是首領副本 (Leader replica),所有的事件都直接傳送給首領副本;其他副本是跟隨者副本 (Follower replica),需要通過複製來保持與首領副本資料一致,當首領副本不可用時,其中一個跟隨者副本將成為新首領。

2.2 ISR機制

每個分割槽都有一個 ISR(in-sync Replica) 列表,用於維護所有同步的、可用的副本。首領副本必然是同步副本,而對於跟隨者副本來說,它需要滿足以下條件才能被認為是同步副本:

  • 與 Zookeeper 之間有一個活躍的會話,即必須定時向 Zookeeper 傳送心跳;
  • 在規定的時間內從首領副本那裡低延遲地獲取過訊息。

如果副本不滿足上面條件的話,就會被從 ISR 列表中移除,直到滿足條件才會被再次加入。

這裡給出一個主題建立的示例:使用 --replication-factor 指定副本系數為 3,建立成功後使用 --describe 命令可以看到分割槽 0 的有 0,1,2 三個副本,且三個副本都在 ISR 列表中,其中 1 為首領副本。

2.3 不完全的首領選舉

對於副本機制,在 broker 級別有一個可選的配置引數 unclean.leader.election.enable,預設值為 fasle,代表禁止不完全的首領選舉。這是針對當首領副本掛掉且 ISR 中沒有其他可用副本時,是否允許某個不完全同步的副本成為首領副本,這可能會導致資料丟失或者資料不一致,在某些對資料一致性要求較高的場景 (如金融領域),這可能無法容忍的,所以其預設值為 false,如果你能夠允許部分資料不一致的話,可以配置為 true。

2.4 最少同步副本

ISR 機制的另外一個相關引數是 min.insync.replicas , 可以在 broker 或者主題級別進行配置,代表 ISR 列表中至少要有幾個可用副本。這裡假設設定為 2,那麼當可用副本數量小於該值時,就認為整個分割槽處於不可用狀態。此時客戶端再向分割槽寫入資料時候就會丟擲異常 org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

2.5 傳送確認

Kafka 在生產者上有一個可選的引數 ack,該引數指定了必須要有多少個分割槽副本收到訊息,生產者才會認為訊息寫入成功:

  • acks=0 :訊息傳送出去就認為已經成功了,不會等待任何來自伺服器的響應;
  • acks=1 : 只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器成功響應;
  • acks=all :只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。

三、資料請求

3.1 元資料請求機制

在所有副本中,只有領導副本才能進行訊息的讀寫處理。由於不同分割槽的領導副本可能在不同的 broker 上,如果某個 broker 收到了一個分割槽請求,但是該分割槽的領導副本並不在該 broker 上,那麼它就會向客戶端返回一個 Not a Leader for Partition 的錯誤響應。 為了解決這個問題,Kafka 提供了元資料請求機制。

首先叢集中的每個 broker 都會快取所有主題的分割槽副本資訊,客戶端會定期傳送傳送元資料請求,然後將獲取的元資料進行快取。定時重新整理元資料的時間間隔可以通過為客戶端配置 metadata.max.age.ms 來進行指定。有了元資料資訊後,客戶端就知道了領導副本所在的 broker,之後直接將讀寫請求傳送給對應的 broker 即可。

如果在定時請求的時間間隔內發生的分割槽副本的選舉,則意味著原來快取的資訊可能已經過時了,此時還有可能會收到 Not a Leader for Partition 的錯誤響應,這種情況下客戶端會再次求發出元資料請求,然後重新整理本地快取,之後再去正確的 broker 上執行對應的操作,過程如下圖:

3.2 資料可見性

需要注意的是,並不是所有儲存在分割槽首領上的資料都可以被客戶端讀取到,為了保證資料一致性,只有被所有同步副本 (ISR 中所有副本) 都儲存了的資料才能被客戶端讀取到。

3.3 零拷貝

Kafka 所有資料的寫入和讀取都是通過零拷貝來實現的。傳統拷貝與零拷貝的區別如下:

傳統模式下的四次拷貝與四次上下文切換

以將磁碟檔案通過網路傳送為例。傳統模式下,一般使用如下虛擬碼所示的方法先將檔案資料讀入記憶體,然後通過 Socket 將記憶體中的資料傳送出去。

buffer = File.read
Socket.send(buffer)

這一過程實際上發生了四次資料拷貝。首先通過系統呼叫將檔案資料讀入到核心態 Buffer(DMA 拷貝),然後應用程式將記憶體態 Buffer 資料讀入到使用者態 Buffer(CPU 拷貝),接著使用者程式通過 Socket 傳送資料時將使用者態 Buffer 資料拷貝到核心態 Buffer(CPU 拷貝),最後通過 DMA 拷貝將資料拷貝到 NIC Buffer。同時,還伴隨著四次上下文切換,如下圖所示:

sendfile和transferTo實現零拷貝

Linux 2.4+ 核心通過 sendfile 系統呼叫,提供了零拷貝。資料通過 DMA 拷貝到核心態 Buffer 後,直接通過 DMA 拷貝到 NIC Buffer,無需 CPU 拷貝。這也是零拷貝這一說法的來源。除了減少資料拷貝外,因為整個讀檔案到網路傳送由一個 sendfile 呼叫完成,整個過程只有兩次上下文切換,因此大大提高了效能。零拷貝過程如下圖所示:

從具體實現來看,Kafka 的資料傳輸通過 TransportLayer 來完成,其子類 PlaintextTransportLayertransferFrom 方法通過呼叫 Java NIO 中 FileChannel 的 transferTo 方法實現零拷貝,如下所示:

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}

注: transferTotransferFrom 並不保證一定能使用零拷貝。實際上是否能使用零拷貝與作業系統相關,如果作業系統提供 sendfile 這樣的零拷貝系統呼叫,則這兩個方法會通過這樣的系統呼叫充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。

四、物理儲存

4.1 分割槽分配

在建立主題時,Kafka 會首先決定如何在 broker 間分配分割槽副本,它遵循以下原則:

  • 在所有 broker 上均勻地分配分割槽副本;
  • 確保分割槽的每個副本分佈在不同的 broker 上;
  • 如果使用了 broker.rack 引數為 broker 指定了機架資訊,那麼會盡可能的把每個分割槽的副本分配到不同機架的 broker 上,以避免一個機架不可用而導致整個分割槽不可用。

基於以上原因,如果你在一個單節點上建立一個 3 副本的主題,通常會丟擲下面的異常:

Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor   
Exception: Replication factor: 3 larger than available brokers: 1.

4.2 分割槽資料保留規則

保留資料是 Kafka 的一個基本特性, 但是 Kafka 不會一直保留資料,也不會等到所有消費者都讀取了訊息之後才刪除訊息。相反, Kafka 為每個主題配置了資料保留期限,規定資料被刪除之前可以保留多長時間,或者清理資料之前可以保留的資料量大小。分別對應以下四個引數:

  • log.retention.bytes :刪除資料前允許的最大資料量;預設值-1,代表沒有限制;
  • log.retention.ms:儲存資料檔案的毫秒數,如果未設定,則使用 log.retention.minutes 中的值,預設為 null;
  • log.retention.minutes:保留資料檔案的分鐘數,如果未設定,則使用 log.retention.hours 中的值,預設為 null;
  • log.retention.hours:保留資料檔案的小時數,預設值為 168,也就是一週。

因為在一個大檔案裡查詢和刪除訊息是很費時的,也很容易出錯,所以 Kafka 把分割槽分成若干個片段,當前正在寫入資料的片段叫作活躍片段。活動片段永遠不會被刪除。如果按照預設值保留資料一週,而且每天使用一個新片段,那麼你就會看到,在每天使用一個新片段的同時會刪除一個最老的片段,所以大部分時間該分割槽會有 7 個片段存在。

4.3 檔案格式

通常儲存在磁碟上的資料格式與生產者傳送過來訊息格式是一樣的。 如果生產者傳送的是壓縮過的訊息,那麼同一個批次的訊息會被壓縮在一起,被當作“包裝訊息”進行傳送 (格式如下所示) ,然後儲存到磁碟上。之後消費者讀取後再自己解壓這個包裝訊息,獲取每條訊息的具體資訊。

參考資料

  1. Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命燈 (譯) . Kafka 權威指南 . 人民郵電出版社 . 2017-12-26
  2. Kafka 高效能架構之道

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南