1. 程式人生 > >Kafka設計解析(六)Kafka高性能架構之道

Kafka設計解析(六)Kafka高性能架構之道

持久化 edt 可見 enc linu 定義 通信 index posit

轉載自 技術世界,原文鏈接 Kafka設計解析(六)- Kafka高性能架構之道

本文從宏觀架構層面和微觀實現層面分析了Kafka如何實現高性能。包含Kafka如何利用Partition實現並行處理和提供水平擴展能力,如何通過ISR實現可用性和數據一致性的動態平衡,如何使用NIO和Linux的sendfile實現零拷貝以及如何通過順序讀寫和數據壓縮實現磁盤的高效利用。

摘要

上一篇文章《Kafka設計解析(五)Kafka性能測試方法及Benchmark報告》從測試角度說明了Kafka的性能。本文從宏觀架構層面和具體實現層面分析了Kafka如何實現高性能。

目錄

一、宏觀架構層面

1. 利用Partition實現並行處理

1.1 Partition提供並行處理的能力

1.2 Partition是最小並發粒度

2. ISR實現可用性與數據一致性的動態平衡

2.1 CAP理論

2.2 常用數據復制及一致性方案

2.3 使用ISR方案的原因

2.4 ISR相關配置說明

二、具體實現層面

1. 高效使用磁盤

1.1 順序寫磁盤

1.2 充分利用Page Cache

1.3 支持多Disk Drive

2. 零拷貝

2.1 傳統模式下的四次拷貝與四次上下文切換

2.2 sendfile和transferTo實現零拷貝

3. 減少網絡開銷

3.1 批處理

3.2 數據壓縮降低網絡負載

3.3 高效的序列化方式

一、宏觀架構層面

1. 利用Partition實現並行處理

1.1 Partition提供並行處理的能力

Kafka是一個Pub-Sub的消息系統,無論是發布還是訂閱,都須指定Topic。如《Kafka設計解析(一)Kafka背景及架構介紹》一文所述,Topic只是一個邏輯的概念。每個Topic都包含一個或多個Partition,不同Partition可位於不同節點。同時Partition在物理上對應一個本地文件夾,每個Partition包含一個或多個Segment,每個Segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,可以把一個Partition當作一個非常長的數組,可通過這個“數組”的索引(offset)去訪問其數據。

一方面,由於不同Partition可位於不同機器,因此可以充分利用集群優勢,實現機器間的並行處理。另一方面,由於Partition在物理上對應一個文件夾,即使多個Partition位於同一個節點,也可通過配置讓同一節點上的不同Partition置於不同的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優勢。

利用多磁盤的具體方法是,將不同磁盤mount到不同目錄,然後在server.properties中,將log.dirs設置為多目錄(用逗號分隔)。Kafka會自動將所有Partition盡可能均勻分配到不同目錄也即不同目錄(也即不同disk)上。

註:雖然物理上最小單位是Segment,但Kafka並不提供同一Partition內不同Segment間的並行處理。因為對於寫而言,每次只會寫Partition內的一個Segment,而對於讀而言,也只會順序讀取同一Partition內的不同Segment。

1.2 Partition是最小並發粒度

如同《Kafka設計解析(四)Kafka Consumer設計解析》一文所述,多Consumer消費同一個Topic時,同一條消息只會被同一Consumer Group內的一個Consumer所消費。而數據並非按消息為單位分配,而是以Partition為單位分配,也即同一個Partition的數據只會被一個Consumer所消費(在不考慮Rebalance的前提下)。

如果Consumer的個數多於Partition的個數,那麽會有部分Consumer無法消費該Topic的任何數據,也即當Consumer個數超過Partition後,增加Consumer並不能增加並行度。

簡而言之,Partition個數決定了可能的最大並行度。如下圖所示,由於Topic 2只包含3個Partition,故group2中的Consumer 3、Consumer 4、Consumer 5 可分別消費1個Partition的數據,而Consumer 6消費不到Topic 2的任何數據。

技術分享圖片

以Spark消費Kafka數據為例,如果所消費的Topic的Partition數為N,則有效的Spark最大並行度也為N。即使將Spark的Executor數設置為N+M,最多也只有N個Executor可同時處理該Topic的數據。

2. ISR實現可用性與數據一致性的動態平衡

2.1 CAP理論

CAP理論是指,分布式系統中,一致性、可用性和分區容忍性最多只能同時滿足兩個。

一致性

  • 通過某個節點的寫操作結果對後面通過其它節點的讀操作可見
  • 如果更新數據後,並發訪問情況下後續讀操作可立即感知該更新,稱為強一致性
  • 如果允許之後部分或者全部感知不到該更新,稱為弱一致性
  • 若在之後的一段時間(通常該時間不固定)後,一定可以感知到該更新,稱為最終一致性

可用性

  • 任何一個沒有發生故障的節點必須在有限的時間內返回合理的結果

分區容忍性

  • 部分節點宕機或者無法與其它節點通信時,各分區間還可保持分布式系統的功能

一般而言,都要求保證分區容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權衡。

2.2 常用數據復制及一致性方案

Master-Slave

  • RDBMS的讀寫分離即為典型的Master-Slave方案
  • 同步復制可保證強一致性但會影響可用性
  • 異步復制可提供高可用性但會降低一致性

WNR

  • 主要用於去中心化的分布式系統中。DynamoDB與Cassandra即采用此方案或其變種
  • N代表總副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少要讀取的副本數
  • 當W+R>N時,可保證每次讀取的數據至少有一個副本擁有最新的數據
  • 多個寫操作的順序難以保證,可能導致多副本間的寫操作順序不一致。Dynamo通過向量時鐘保證最終一致性

Paxos及其變種

  • Google的Chubby,Zookeeper的原子廣播協議(Zab),RAFT等

基於ISR的數據復制方案

如《 Kafka High Availability(上)》一文所述,Kafka的數據復制是以Partition為單位的。而多個備份間的數據復制,通過Follower向Leader拉取數據完成。從一這點來講,Kafka的數據復制方案接近於上文所講的Master-Slave方案。不同的是,Kafka既不是完全的同步復制,也不是完全的異步復制,而是基於ISR的動態復制方案。

ISR,也即In-sync Replica。每個Partition的Leader都會維護這樣一個列表,該列表中,包含了所有與之同步的Replica(包含Leader自己)。每次數據寫入時,只有ISR中的所有Replica都復制完,Leader才會將其置為Commit,它才能被Consumer所消費。

這種方案,與同步復制非常接近。但不同的是,這個ISR是由Leader動態維護的。如果Follower不能緊“跟上”Leader,它將被Leader從ISR中移除,待它又重新“跟上”Leader後,會被Leader再次加加ISR中。每次改變ISR後,Leader都會將最新的ISR持久化到Zookeeper中。

至於如何判斷某個Follower是否“跟上”Leader,不同版本的Kafka的策略稍微有些區別。

  • 對於0.8.*版本,如果Follower在replica.lag.time.max.ms時間內未向Leader發送Fetch請求(也即數據復制請求),則Leader會將其從ISR中移除。如果某Follower持續向Leader發送Fetch請求,但是它與Leader的數據差距在replica.lag.max.messages以上,也會被Leader從ISR中移除。
  • 從0.9.0.0版本開始,replica.lag.max.messages被移除,故Leader不再考慮Follower落後的消息條數。另外,Leader不僅會判斷Follower是否在replica.lag.time.max.ms時間內向其發送Fetch請求,同時還會考慮Follower是否在該時間內與之保持同步。
  • 0.10.* 版本的策略與0.9.*版一致

對於0.8.*版本的replica.lag.max.messages參數,很多讀者曾留言提問,既然只有ISR中的所有Replica復制完後的消息才被認為Commit,那為何會出現Follower與Leader差距過大的情況。原因在於,Leader並不需要等到前一條消息被Commit才接收後一條消息。事實上,Leader可以按順序接收大量消息,最新的一條消息的Offset被記為High Wartermark。而只有被ISR中所有Follower都復制過去的消息才會被Commit,Consumer只能消費被Commit的消息。由於Follower的復制是嚴格按順序的,所以被Commit的消息之前的消息肯定也已經被Commit過。換句話說,High Watermark標記的是Leader所保存的最新消息的offset,而Commit Offset標記的是最新的可被消費的(已同步到ISR中的Follower)消息。而Leader對數據的接收與Follower對數據的復制是異步進行的,因此會出現Commit Offset與High Watermark存在一定差距的情況。0.8.*版本中replica.lag.max.messages限定了Leader允許的該差距的最大值。

Kafka基於ISR的數據復制方案原理如下圖所示。

技術分享圖片

如上圖所示,在第一步中,Leader A總共收到3條消息,故其high watermark為3,但由於ISR中的Follower只同步了第1條消息(m1),故只有m1被Commit,也即只有m1可被Consumer消費。此時Follower B與Leader A的差距是1,而Follower C與Leader A的差距是2,均未超過默認的replica.lag.max.messages,故得以保留在ISR中。在第二步中,由於舊的Leader A宕機,新的Leader B在replica.lag.time.max.ms時間內未收到來自A的Fetch請求,故將A從ISR中移除,此時ISR={B,C}。同時,由於此時新的Leader B中只有2條消息,並未包含m3(m3從未被任何Leader所Commit),所以m3無法被Consumer消費。第四步中,Follower A恢復正常,它先將宕機前未Commit的所有消息全部刪除,然後從最後Commit過的消息的下一條消息開始追趕新的Leader B,直到它“趕上”新的Leader,才被重新加入新的ISR中。

2.3 使用ISR方案的原因

  • 由於Leader可移除不能及時與之同步的Follower,故與同步復制相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。
  • ISR中的所有Follower都包含了所有Commit過的消息,而只有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處於同步狀態,從而與異步復制方案相比提高了數據一致性。
  • ISR可動態調整,極限情況下,可以只包含Leader,極大提高了可容忍的宕機的Follower的數量。與Majority Quorum方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。

2.4 ISR相關配置說明

  • Broker的min.insync.replicas參數指定了Broker所要求的ISR最小長度,默認值為1。也即極限情況下ISR可以只包含Leader。但此時如果Leader宕機,則該Partition不可用,可用性得不到保證。
  • 只有被ISR中所有Replica同步的消息才被Commit,但Producer發布數據時,Leader並不需要ISR中的所有Replica同步該數據才確認收到數據。Producer可以通過acks參數指定最少需要多少個Replica確認收到該消息才視為該消息發送成功。acks的默認值是1,即Leader收到該消息後立即告訴Producer收到該消息,此時如果在ISR中的消息復制完該消息前Leader宕機,那該條消息會丟失。而如果將該值設置為0,則Producer發送完數據後,立即認為該數據發送成功,不作任何等待,而實際上該數據可能發送失敗,並且Producer的Retry機制將不生效。更推薦的做法是,將acks設置為all或者-1,此時只有ISR中的所有Replica都收到該數據(也即該消息被Commit),Leader才會告訴Producer該消息發送成功,從而保證不會有未知的數據丟失。

二、具體實現層面

1. 高效使用磁盤

1.1 順序寫磁盤

根據《一些場景下順序寫磁盤快於隨機寫內存》所述,將寫磁盤的過程變為順序寫,可極大提高對磁盤的利用率。

Kafka的整個設計中,Partition相當於一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些數據,並且不刪除已經消費的數據,從而避免了隨機寫磁盤的過程。

由於磁盤有限,不可能保存所有數據,實際上作為消息系統Kafka也沒必要保存所有數據,需要刪除舊的數據。而這個刪除過程,並非通過使用“讀-寫”模式去修改文件,而是將Partition分為多個Segment,每個Segment對應一個物理文件,通過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操作。

通過如下代碼可知,Kafka刪除Segment的方式,是直接刪除Segment對應的整個log文件和整個index文件而非刪除文件中的部分內容。

/**
 * Delete this log segment from the filesystem.
 *
 * @throws KafkaStorageException if the delete fails.
 */
def delete() {
  val deletedLog = log.delete()
  val deletedIndex = index.delete()
  val deletedTimeIndex = timeIndex.delete()
  if(!deletedLog && log.file.exists)
    throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
  if(!deletedIndex && index.file.exists)
    throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
  if(!deletedTimeIndex && timeIndex.file.exists)
    throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
}

1.2 充分利用Page Cache

使用Page Cache的好處如下

  • I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提高性能
  • I/O Scheduler會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
  • 充分利用所有空閑內存(非JVM內存)。如果使用應用層Cache(即JVM堆內存),會增加GC負擔
  • 讀操作可直接在Page Cache內進行。如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過Page Cache)交換數據
  • 如果進程重啟,JVM內的Cache會失效,但Page Cache仍然可用

Broker收到數據後,寫磁盤時只是將數據寫入Page Cache,並不保證數據一定完全寫入磁盤。從這一點看,可能會造成機器宕機時,Page Cache內的數據未寫入磁盤從而造成數據丟失。但是這種丟失只發生在機器斷電等造成操作系統不工作的場景,而這種場景完全可以由Kafka層面的Replication機制去解決。如果為了保證這種情況下數據不丟失而強制將Page Cache中的數據Flush到磁盤,反而會降低性能。也正因如此,Kafka雖然提供了flush.messages和flush.ms兩個參數將Page Cache中的數據強制Flush到磁盤,但是Kafka並不建議使用。

如果數據消費速度與生產速度相當,甚至不需要通過物理磁盤交換數據,而是直接通過Page Cache交換數據。同時,Follower從Leader Fetch數據時,也可通過Page Cache完成。下圖為某Partition的Leader節點的網絡/磁盤讀寫信息。

技術分享圖片

從上圖可以看到,該Broker每秒通過網絡從Producer接收約35MB數據,雖然有Follower從該Broker Fetch數據,但是該Broker基本無讀磁盤。這是因為該Broker直接從Page Cache中將數據取出返回給了Follower。

1.3 支持多Disk Drive

Broker的log.dirs配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然後將這些目錄都配置到log.dirs裏。Kafka會盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。

2. 零拷貝

Kafka中存在大量的網絡數據持久化到磁盤(Producer到Broker)和磁盤文件通過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。

2.1 傳統模式下的四次拷貝與四次上下文切換

以將磁盤文件通過網絡發送為例。傳統模式下,一般使用如下偽代碼所示的方法先將文件數據讀入內存,然後通過Socket將內存中的數據發送出去。

buffer = File.read
Socket.send(buffer)

這一過程實際上發生了四次數據拷貝。首先通過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),然後應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接著用戶程序通過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最後通過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨著四次上下文切換,如下圖所示。

技術分享圖片

2.2 sendfile和transferTo實現零拷貝

Linux 2.4+內核通過sendfile系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer後,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。零拷貝過程如下圖所示。

技術分享圖片

從具體實現來看,Kafka的數據傳輸通過TransportLayer來完成,其子類PlaintextTransportLayer通過Java NIO的FileChannel的transferTo和transferFrom方法實現零拷貝,如下所示。

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}

註: transferTo和transferFrom並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。

3. 減少網絡開銷

3.1 批處理

批處理是一種常用的用於提高I/O性能的方式。對Kafka而言,批處理既減少了網絡傳輸的Overhead,又提高了寫磁盤的效率。

Kafka 0.8.1及以前的Producer區分同步Producer和異步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個KeyedMessage作為參數,一次發送一條消息。另一種是接受一批KeyedMessage作為參數,一次性發送多條消息。而對於異步發送而言,無論是使用哪個send方法,實現上都不會立即將消息發送給Broker,而是先存到內部的隊列中,直到消息條數達到閾值或者達到指定的Timeout才真正的將消息發送出去,從而實現了消息的批量發送。

Kafka 0.8.2開始支持新的Producer API,將同步Producer和異步Producer結合。雖然從send接口來看,一次只能發送一個ProducerRecord,而不能像之前版本的send方法一樣接受消息列表,但是send方法並非立即將消息發送出去,而是通過batch.size和linger.ms控制實際發送頻率,從而實現批量發送。

由於每次網絡傳輸,除了傳輸消息本身以外,還要傳輸非常多的網絡協議本身的一些內容(稱為Overhead),所以將多條消息合並到一起傳輸,可有效減少網絡傳輸的Overhead,進而提高了傳輸效率。

從零拷貝章節的圖中可以看到,雖然Broker持續從網絡接收數據,但是寫磁盤並非每秒都在發生,而是間隔一段時間寫一次磁盤,並且每次寫磁盤的數據量都非常大(最高達到718MB/S)。

3.2 數據壓縮降低網絡負載

Kafka從0.7開始,即支持將數據壓縮後再傳輸給Broker。除了可以將每條消息單獨壓縮然後傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一起壓縮後傳輸。數據壓縮的一個基本原理是,重復數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網絡傳輸效率。

Broker接收消息後,並不直接解壓縮,而是直接將消息以壓縮後的形式持久化到磁盤。Consumer Fetch到數據後再解壓縮。因此Kafka的壓縮不僅減少了Producer到Broker的網絡傳輸負載,同時也降低了Broker磁盤操作的負載,也降低了Consumer與Broker間的網絡傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。

3.3 高效的序列化方式

Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器即可。因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實際網絡傳輸和磁盤存儲的數據規模,從而提高吞吐率。這裏要註意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。

Kafka設計解析(六)Kafka高性能架構之道