1. 程式人生 > >kafka系列四、kafka架構原理

kafka系列四、kafka架構原理

一、概述

  Kakfa起初是由LinkedIn公司開發的一個分散式的訊息系統,後成為Apache的一部分,它使用Scala編寫,以可水平擴充套件和高吞吐率而被廣泛使用。目前越來越多的開源分散式處理系統如Cloudera、Apache Storm、Spark等都支援與Kafka整合。     Kafka憑藉著自身的優勢,越來越受到網際網路企業的青睞,唯品會也採用Kafka作為其內部核心訊息引擎之一。Kafka作為一個商業級訊息中介軟體,訊息可靠性的重要性可想而知。 如何確保訊息的精確傳輸?如何確保訊息的準確儲存?如何確保訊息的正確消費?這些都是需要考慮的問題。本文首先從Kafka的架構著手,先了解下Kafka的基本原理,然後通過對kakfa的儲存機制、複製原理、同步原理、可靠性和永續性保證等等一步步對其可靠性進行分析,最後通過benchmark來增強對Kafka高可靠性的認知。

二、Kafka的使用場景

(1)日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等; (2)訊息系統:解耦和生產者和消費者、快取訊息等; (3)使用者活動跟蹤:Kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、資料倉庫中做離線分析和挖掘; (4)運營指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告; (5)流式處理:比如spark streaming和storm; (6)事件源;

三、Kafka基本架構

  如上圖所示,一個典型的Kafka體系架構包括:

  • 若干Producer(可以是伺服器日誌,業務資料,頁面前端產生的page view等等),
  • 若干broker(Kafka支援水平擴充套件,一般broker數量越多,叢集吞吐率越高),
  • 若干Consumer (Group),以及一個Zookeeper叢集。

  Kafka通過Zookeeper管理叢集配置,選舉leader,以及在consumer group發生變化時進行rebalance。Producer使用push(推)模式將訊息釋出到broker,Consumer使用pull(拉)模式從broker訂閱並消費訊息。

1、Topic & Partition

    一個topic可以認為一個一類訊息,每個topic將被分成多個partition,每個partition在儲存層面是append log檔案。任何釋出到此partition的訊息都會被追加到log檔案的尾部,每條訊息在檔案中的位置稱為offset(偏移量),offset為一個long型的數字,它唯一標記一條訊息。每條訊息都被append到partition中,是順序寫磁碟,因此效率非常高(經驗證,順序寫磁碟效率比隨機寫記憶體還要高,這是Kafka高吞吐率的一個很重要的保證)。

  每一條訊息被髮送到broker中,會根據partition規則選擇被儲存到哪一個partition。partition機制可以通過指定producer的partition.class這一引數來指定,該class必須實現kafka.producer.Partitioner介面。如果partition規則設定的合理,所有訊息可以均勻分佈到不同的partition裡,這樣就實現了水平擴充套件。(如果一個topic對應一個檔案,那這個檔案所在的機器I/O將會成為這個topic的效能瓶頸,而partition解決了這個問題)。在建立topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數量(如下所示),當然可以在topic建立之後去修改partition的數量。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#預設partitions數量 num.partitions
=1

四、高可靠性儲存分析

  Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。通過調節其副本相關引數,可以使得Kafka在效能和可靠性之間運轉的遊刃有餘。Kafka從0.8.x版本開始提供partition級別的複製,replication的數量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。     這裡先從Kafka檔案儲存機制入手,從最底層瞭解Kafka的儲存細節,進而對其的儲存有個微觀的認知。之後通過Kafka複製原理和同步方式來闡述巨集觀層面的概念。最後從ISR,HW,leader選舉以及資料可靠性和永續性保證等等各個維度來豐富對Kafka相關知識點的認知。

1、 Kafka檔案儲存機制

  Kafka中訊息是以topic進行分類的,生產者通過topic向Kafka broker傳送訊息,消費者通過topic讀取資料。然而topic在物理層面又能以partition為分組,一個topic可以分成若干個partition,那麼topic以及partition又是怎麼儲存的呢?partition還可以細分為segment,一個partition物理上由多個segment組成,那麼這些segment又是什麼呢?下面我們來一一揭曉。     為了便於說明問題,假設這裡只有一個Kafka叢集,且這個叢集只有一個Kafka broker,即只有一臺物理機。在這個Kafka broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs,以此來設定Kafka訊息檔案儲存目錄,與此同時建立一個topic:topic_zzh_test,partition的數量為4($KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_vms_test –replication-factor 4)。那麼我們此時可以在/tmp/kafka-logs目錄中可以看到生成了4個目錄:
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0 
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1 
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2 
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3  
  在Kafka檔案儲存中,同一個topic下有多個不同的partition,每個partiton為一個目錄,partition的名稱規則為: topic名稱+有序序號,第一個序號從0開始計,最大的序號為partition數量減1, partition是實際物理上的概念,而topic是邏輯上的概念。     上面提到partition還可以細分為segment,這個segment又是什麼?如果就以partition為最小儲存單位,我們可以想象當Kafka producer不斷髮送訊息,必然會引起partition檔案的無限擴張,這樣對於訊息檔案的維護以及已經被消費的訊息的清理帶來嚴重的影響,所以這裡以segment為單位又將partition細分。每個partition(目錄) 相當於一個巨型檔案被平均分配到多個大小相等的segment(段)資料檔案中(每個segment 檔案中訊息數量不一定相等)這種特性也方便old segment的刪除,即方便已被消費的訊息的清理,提高磁碟的利用率。每個partition只需要支援順序讀寫就行,segment的檔案生命週期由服務端配置引數 (log.segment.bytes,log.roll.{ms,hours}等若干引數)決定。
 #在強制重新整理資料到磁碟允許接收訊息的數量
#log.flush.interval.messages=10000

# 在強制重新整理之前,訊息可以在日誌中停留的最長時間
#log.flush.interval.ms=1000

#一個日誌的最小存活時間,可以被刪除
log.retention.hours=168

#  一個基於大小的日誌保留策略。段將被從日誌中刪除只要剩下的部分段不低於log.retention.bytes。
#log.retention.bytes=1073741824

#  每一個日誌段大小的最大值。當到達這個大小時,會生成一個新的片段。
log.segment.bytes=1073741824

# 檢查日誌段的時間間隔,看是否可以根據保留策略刪除它們
log.retention.check.interval.ms=300000

   segment檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為segment索引檔案和資料檔案。這兩個檔案的命令規則為:partition全域性的第一個segment從0開始,後續每個segment檔名為上一個segment檔案最後一條訊息的offset值,數值大小為64位,20位數字字元長度,沒有數字用0填充,如下:

00000000000000000000.index 
00000000000000000000.log 
00000000000000170410.index 
00000000000000170410.log 
00000000000000239430.index 
00000000000000239430.log  

以上面的segment檔案為例,展示出segment:00000000000000170410的“.index”檔案和“.log”檔案的對應的關係,如下圖:

 

 如上圖,“.index”索引檔案儲存大量的元資料,“.log”資料檔案儲存大量的訊息,索引檔案中的元資料指向對應資料檔案中message的物理偏移地址。其中以“.index”索引檔案中的元資料[3, 348]為例,在“.log”資料檔案表示第3個訊息,即在全域性partition中表示170410+3=170413個訊息,該訊息的物理偏移地址為348。

那麼如何從partition中通過offset查詢message呢? 以上圖為例,讀取offset=170418的訊息,首先查詢segment檔案,其中00000000000000000000.index為最開始的檔案,第二個檔案為00000000000000170410.index(起始偏移為170410+1=170411),而第三個檔案為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了第二個檔案之中。其他後續檔案可以依次類推,以其實偏移量命名並排列這些檔案,然後根據二分查詢法就可以快速定位到具體檔案位置。其次根據00000000000000170410.index檔案中的[8,1325]定位到00000000000000170410.log檔案中的1325的位置進行讀取。 要是讀取offset=170418的訊息,從00000000000000170410.log檔案中的1325的位置進行讀取,那麼怎麼知道何時讀完本條訊息,否則就讀到下一條訊息的內容了? 這個就需要聯絡到訊息的物理結構了,訊息都具有固定的物理結構,包括:offset(8 Bytes)、訊息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等欄位,可以確定一條訊息的大小,即讀取到哪裡截止。