1. 程式人生 > >訊息佇列之kafka(核心架構)

訊息佇列之kafka(核心架構)

1. Kafka的經典架構

訊息佇列之kafka(核心架構)
 Kafka是LinkedIn 用於日誌處理的分散式訊息佇列,同時支援離線和線上日誌處理。
 Kafka 對訊息儲存時根據 Topic 進行歸類。
 傳送訊息者就是Producer,訊息的釋出描述為Producer
 訊息接受者就是 Consumer,訊息的訂閱描述為 Consumer
 每個 Kafka 例項稱為 Broker,將中間的儲存陣列稱作 Broker(代理),Broker也是kafka叢集的節點

2.架構的角色介紹

  (1)broker

  kafka叢集包括一個或者多個伺服器,這種伺服器被稱為brker。
  broker也就是中間的儲存佇列的節點例項。我們將訊息釋出者稱為:Produce,將訊息的訂閱者稱為:Consumer,將中間的儲存陣列稱為broker。

  (2)topic

  每條釋出到kafka叢集的訊息都有一個類別,這個類別被成為Tpoic。物理上不同的topic的訊息分開儲存,邏輯上一個topic的訊息雖然儲存與一個或者多個broker中。但使用者只需要指定消費的topic,即生產或者消費資料的客戶端不需要關心資料儲存與何處。
  kafka中釋出訂閱的物件就是topic。為每一個數據型別建立一個topic,把向topic釋出訊息的客戶端稱為producer,從topic訂閱訊息的客戶端稱為consumer,producer和consumer可以同時從多個topic讀寫資料。一個kafka叢集由一個或者多個broker伺服器組成。他負責持久化和備份具體的kafka訊息。
  topic就是資料的主題,是資料記錄釋出的地方,可以用來區分業務系統。kafka中的topics總是多訂閱者模式

,一個topic可以擁有一個或者多個消費者來訂閱它的資料。

  (3)partition

訊息佇列之kafka(核心架構)
  partition是物理的概念,每一個topic包含一個或者多個partition。
  topic的分割槽策略(針對寫資料的時候進行分割槽):
    - 輪詢:順序分發,僅針對於message沒有key的時候。
    - Hash分割槽:在message有key的情況下,(key.hash%分割槽個數)。如果在增加分割槽的時候,partition裡面的message不會重新進行分配,隨著資料的繼續寫入,這個新的分割槽才會參與load balance。


  topic的分割槽邏輯儲存方式
訊息佇列之kafka(核心架構)


   topic 會分成一個或多個 partition,每個 partiton 相當於是一個 子 queue。在物理結構上,每個 partition 對應一個物理的目錄(資料夾),資料夾命名是 [topicname][partition][序號],一個 topic 可以有無數多的 partition,根據業務需求和資料量 來設定。在 kafka 配置檔案中可隨時更高 num.partitions 引數來配置更改 topic 的 partition 數 量,在建立 Topic 時通過引數指定 parittion 數量。Topic 建立之後通過 Kafka 提供的工具也可以修改 partiton 數量。分割槽中存放著資料本身和資料的index下標。在向partition寫入資料的時候,是順序寫入的,每一個數據寫入的時候都會有一個類似下標的東西(index),隨著資料的寫入而增長。partition也是叢集負載均衡的基本單位。


  總結
    - 一個topic的partition數量大於等於broker的數量,可以提高吞吐率。
    - 同一個partition的Replica儘量分散到不同的機器上,高可用。
    - kafka的分割槽數:(1|2|3 + 0.95) * broker數量

  (4)Producer

   負責主動釋出訊息到kakfa broker(push)
   kafka訊息的儲存策略:每個 Topic 被分成多個 partition(區)。每條訊息在 partition 中的位置稱為 offset(偏移量),型別為 long 型數字。訊息即使被消費了,也不會被立即刪除, 而是根據 broker 裡的設定(基於時間儲存或者基於大小),儲存一定時間後再清除,比如 log 檔案設定儲存兩天,則兩天後, 不管訊息是否被消費,都清除。

  (5)Consumer

   訊息消費者,向kafkabroker讀取訊息的客戶端。(pull)
   消費訊息的策略:(使用的是roundrabin演算法):如果有4個分割槽,現在有三個消費者執行緒,那麼這個三個執行緒一人分一個分割槽消費,最後一個分割槽以輪詢的方式,傳送給第一個執行緒消費,如果此時又多加入一個執行緒,那麼就會將第4個分割槽就分給新加入的執行緒消費,如果有一個執行緒退出,那麼第三個和第四個分割槽也會以輪詢的方式,傳送給第一個執行緒和第二個執行緒消費。(kafka內部自動維護這個負載均衡)。
   消費的原則:一個consumer對一個partition中的一條資料只需要消費一次,每一個consumer組維護一個下標檔案,叫做offset,這個offset用於記錄當前的consumer組消費資料的下標,每進行消費一條資料,當前的offset就會遞增1(offset之前的資料,都表示已經消費過的資料)。

  (6)Consumer group

訊息佇列之kafka(核心架構)
   一個consumer group 包含多個consumer,這個是預先在配置檔案中配置好的。各個consumer可以組成一個租,partition中的每一個message只能被一個組中的一個consumer進行消費,其他的consumer不能消費同一個topic中同一個分割槽的資料,不同組的consumer可以消費同一個topic的同一個分割槽的資料。
    廣播和單播
     廣播:所有的consumer每一個consumer劃分一組
     單播:所有的consumer劃分一組(一組中只允許一個消費)
    對於kafka消費的總結
      - 一個分割槽只能被一個消費者組中的一個成員消費
      - 一個成員可以消費一個topic的多個分割槽
      - 一個 Topic 中的每個 Partition 只會被一個“Consumer group”中的一個 Consumer 消費
      - 一個成員還可以消費另外一個topic的分割槽

  (7)segment

    在kafka檔案儲存找中,同一個topic下有多個partition,每一個partition為一個目錄,partition命名規則為:topic 名稱+有序序號,第一個partition序號從0開始,序號最大值為partitions數量-1,partition物理上由多個segment組成,每一個segment儲存著多個message資訊(預設是:1G),而每一個message是由一個key-value和一個時間戳組成。
    segment檔案的生命週期由伺服器配置引數決定:預設的是168個小時後刪除。
    segment由兩大部分組成: index filedata file,這2個檔案一一對應,成對出現,字尾".index"和".log"分別表示為 segment 索引檔案、資料檔案。
訊息佇列之kafka(核心架構)
    segment的命名規則:partion 全域性的第一個 segment 從 0 開始,後續每個 segment 檔名為上一個 segment 檔案最後一條訊息的 offset 值。數值最大為 64 位 long 大小,19 位數字字元長度,沒有數字用 0 填充。(每一個partition都是如此)
    segment的index file: 索引檔案儲存大量元資料,資料檔案儲存大量訊息,索引檔案中元資料指向對應資料檔案中 message 的物理偏移地址。
訊息佇列之kafka(核心架構)
    segment的data file
訊息佇列之kafka(核心架構)


kafka讀取資料的查詢message的步驟
以讀取 offset=368776 的 message,需要通過下面 2 個步驟查詢。
訊息佇列之kafka(核心架構)
第一步:00000000000000000000.index,表示最開始的檔案,起始偏移量(offset)為 0,00000000000000368769.index 的訊息量起始偏移量為 368770 = 368769 + 1,00000000000000737337.index 的起始偏移量為 737338=737337 + 1,其他後續檔案依次類推。以起始偏移量命名並排序這些檔案,只要根據 offset 二分查詢檔案列表,就可以快速定 位到具體檔案。當 offset=368776 時定位到 00000000000000368769.index 和對應 log 檔案。
第二步:當 offset=368776 時,依次定位到 00000000000000368769.index 的元資料物理位置和 00000000000000368769.log 的物理偏移地址,然後再通過 00000000000000368769.log 順序查詢直到 offset=368776 為止。查詢的時候是通過相對偏移量,在.index檔案中有兩列(序列,地址),其中序列是相對偏移量:序列=查詢的message的偏移量-當前檔案的起始偏移量 ,然後根據序列對應的地址,找到相應的位置上的資料message。