1. 程式人生 > >Kafka原始碼分析及圖解原理之Broker端

Kafka原始碼分析及圖解原理之Broker端

一.前言

  https://www.cnblogs.com/GrimMjx/p/11354987.html

  上一節說過,任何訊息佇列都是萬變不離其宗都是3部分,訊息生產者(Producer)、訊息消費者(Consumer)和服務載體(在Kafka中用Broker指代)。上一節講了kafka producer端的一些細節,那麼這一節來講broker端的一些設計與原理

  首先從kafka如何建立一個topic來開始:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  其中有這麼幾個引數:

  • --zookeeper:zookeeper的地址
  • --replication-factor:副本因子
  • --partitions:分割槽個數(預設是1)
  • --topic:topic名稱

二.什麼是分割槽

  一個topic可以有多個分割槽,每個分割槽的訊息都是不同的。雖然分割槽可以提供更高的吞吐量,但是分割槽不是越多越好。一般分割槽數不要超過kafka叢集的機器數量。分割槽越多佔用的記憶體和檔案控制代碼。一般分割槽設定為3-10個。比如現在叢集有3個機器,要建立一個名為test的topic,分割槽數為2,那麼如圖:

  partiton都是有序切順序不可變的記錄集,並且不斷追加到log檔案,partition中的每一個訊息都回分配一個id,也就是offset(偏移量),offset用來標記分割槽的一條記錄,這裡就用官網的圖了,我畫的不好:

2.1 producer端和分割槽關係

  就圖上的情況,producer端會把mq給哪個分割槽呢?這也是上一節我們提到的一個引數partitioner.class。預設分割槽器的處理是:有key則用murmur2演算法計算key的雜湊值,對總分割槽取模算出分割槽號,無key則輪詢。(org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition)。當然了我們也可以自定義分割槽策略,只要實現org.apache.kafka.clients.producer.Partitioner介面即可:

 1 /**
 2  * Compute the partition for the given record.
 3  *
 4  * @param topic The topic name
 5  * @param key The key to partition on (or null if no key)
 6  * @param keyBytes serialized key to partition on (or null if no key)
 7  * @param value The value to partition on or null
 8  * @param valueBytes serialized value to partition on or null
 9  * @param cluster The current cluster metadata
10  */
11 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
12     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
13     int numPartitions = partitions.size();
14     if (keyBytes == null) {
15         int nextValue = nextValue(topic);
16         List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
17         if (availablePartitions.size() > 0) {
18             int part = Utils.toPositive(nextValue) % availablePartitions.size();
19             return availablePartitions.get(part).partition();
20         } else {
21             // no partitions are available, give a non-available partition
22             return Utils.toPositive(nextValue) % numPartitions;
23         }
24     } else {
25         // hash the keyBytes to choose a partition
26         return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
27     }
28 }

2.2 consumer端和分割槽關係

  先來看下官網對於消費組的定義:Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

  翻譯:消費者使用一個消費者組名來標記自己,一個topic的訊息會被髮送到訂閱它的消費者組的一個消費者例項上。

  consumer group是用於實現高伸縮性,高容錯性的consumer機制。如果有consumer掛了或者新增一個consumer,consumer group會進行重平衡(rebalance),重平衡機制會在consumer篇具體講解,本節不講。那麼按照上面的圖繼續畫消費者端:

  這裡是最好的情況,2個partition對應1個group中的2個consumer。那麼思考,如果一個消費組的消費者大於分割槽數呢?或者小於分割槽數呢?

  如果一個消費組的消費者大於分割槽數,那麼相當於多餘的消費者是一種浪費,多餘的消費者將無法消費訊息。

  如果一個消費組的消費者小於分割槽數,會有對應的消費者分割槽分配策略。一種是Range(預設),一種是RoundRobin(輪詢),當然也可以自定義策略。其實思想換湯不換藥的啊,每個消費者能負載均衡的工作。具體會在消費者篇講解,這裡不講。

  建議:配置分割槽數是消費者數的整數倍

三.副本與ISR設計

3.1 什麼是副本

  在建立topic的時候有個引數是--replication-factor來設定副本數。Kafka利用多份相同的備份保持系統的高可用性,這些備份在Kafka中被稱為副本(replica)。副本分為3類:

  • leader副本:響應producer端的讀寫請求
  • follower副本:備份leader副本的資料,不響應producer端的讀寫請求!
  • ISR副本集合:包含1個leader副本和所有follower副本(也可能沒有follower副本)

  Kafka會把所有的副本均勻分配到kafka-cluster中的所有broker上,並從這些副本中挑選一個作為leader副本,其他成為follow副本。如果leader副本所在的broker宕機了,那麼其中的一個follow副本就會成為leader副本。leader副本接收producer端的讀寫請求,而follow副本只是向leader副本請求資料不會接收讀寫請求!

3.2 副本同步機制

  上面說了ISR就是動態維護一組同步副本集合,leader副本總是包含在ISR集合中。只有ISR中的副本才有資格被選舉為leader副本。當producer端的ack引數配置為all(-1)時,producer寫入的mq需要ISR所有副本都接收到,才被視為已提交。當然了,上一節就提到了,使用ack引數必須配合broker端的min.insync.replicas(預設是1)引數一起用才能達到效果,該引數控制寫入isr中的多少副本才算成功。如果ISR中的副本數少於min.insync.replicas時,客戶端會返回異常org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

  要了解副本同步機制需要先學習幾個術語:

  • High Watermark:副本高水位值,簡稱HW,小於HW或者說在HW以下的訊息都被認為是“已備份的”,HW指向的也是下一條訊息!leader副本的HW值決定consumer能poll的訊息數量!consumer只能消費小於HW值的訊息!
  • LEO:log end offset,下一條訊息的位移。也就是說LEO指向的位置是沒有訊息的!
  • remote LEO:嚴格來說這是一個集合。leader副本所在broker的記憶體中維護了一個Partition物件來儲存對應的分割槽資訊,這個Partition中維護了一個Replica列表,儲存了該分割槽所有的副本物件。除了leader Replica副本之外,該列表中其他Replica物件的LEO就被稱為remote LEO

  下面舉個一個實際的例子(本例子參考胡夕部落格),該例子中的topic是單分割槽,副本因子是2。也就是說一個leader副本,一個follower副本,ISR中包含這2個副本集合。我們首先看下當producer傳送一條訊息時,leader/follower端broker的副本物件到底會發生什麼事情以及分割槽HW是如何被更新的。首先是初始狀態:

  此時producer給該topic分割槽傳送了一條訊息。此時的狀態如下圖所示:

  如上圖所見,producer傳送訊息成功後(假設acks=1, leader成功寫入即返回),follower發來了新的FECTH請求,依然請求fetchOffset = 0的資料。和上次不同的是,這次是有資料可以讀取的,因此整個處理流程如下圖:

   顯然,現在leader和follower都儲存了位移是0的這條訊息,但兩邊的HW值都沒有被更新,它們需要在下一輪FETCH請求處理中被更新,如下圖所示:

  簡單解釋一下, 第二輪FETCH請求中,follower傳送fetchOffset = 1的FETCH請求——因為fetchOffset = 0的訊息已經成功寫入follower本地日誌了,所以這次請求fetchOffset = 1的資料了。Leader端broker接收到FETCH請求後首先會更新other replicas中的LEO值,即將remote LEO更新成1,然後更新分割槽HW值為1——具體的更新規則參見上面的解釋。做完這些之後將當前分割槽HW值(1)封裝進FETCH response傳送給follower。Follower端broker接收到FETCH response之後從中提取出當前分割槽HW值1,然後與自己的LEO值比較,從而將自己的HW值更新成1,至此完整的HW、LEO更新週期結束。

3.3 ISR維護  

  在0.9.0.0版本之後,只有一個引數:replica.lag.time.max.ms來判定該副本是否應該在ISR集合中,這個引數預設值為10s。意思是如果一個follower副本響應leader副本的時間超過10s,kafka會認為這個副本走遠了從同步副本列表移除。

四.日誌設計

  Kafka的每個主題相互隔離,每個主題可以有一個或者多個分割槽,每個分割槽都有記錄訊息資料的日誌檔案:

   圖中有個demo-topic的主題,這個topic有8個分割槽,每一個分割槽都存在[topic-partition]命名的訊息日誌檔案。在分割槽日誌檔案中,可以看到字首一樣,但是檔案型別不一樣的幾個檔案。比如圖中的3個檔案,(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。這稱之為一個LogSegment(日誌分段)。

4.1 LogSegment

  以一個測試環境的具體例子來講,一個名為ALC.ASSET.EQUITY.SUBJECT.CHANGE的topic,我們看partition0的日誌檔案:

  每一個LogSegment都包含一些檔名一致的檔案集合。檔名的固定是20位數字,如果檔名是00000000000000000000代表當前LogSegment的第一條訊息的offset(偏移量)為0,如果檔名是00000000000000000097代表當前LogSegment的第一條訊息的offset(偏移量)為97。日誌檔案有多種字尾的檔案,重點關注.index、.timestamp、.log三種類型檔案即可。

  • .index:偏移量索引檔案
  • .timeindex:時間索引檔案
  • .log:日誌檔案
  • .snapshot:快照檔案
  • .swap:Log Compaction之後的臨時檔案

4.2 索引與日誌檔案

  kafka有2種索引檔案,第一種是offset(偏移量)索引檔案,也就是.index結尾的檔案。第二種是時間戳索引檔案,也就是.timeindex結尾的檔案。

  我們可以用kafka-run-class.sh來檢視offset(偏移量)索引檔案的內容:

  可以看到每一行都是offset:xxx  position:xxxx。這兩者沒有直接關係。

  • offset:相對偏移量
  • position:實體地址

  那麼第一行的offset:12 position:4423是什麼意思呢?它代表偏移量從0-12的訊息的實體地址在0-4423。

  同理第二行的offset:24 position:8773的意思也能猜得出來:它代表偏移量從13-24的訊息的實體地址在4424-8773。

  我們可以再用kafka-run-class.sh來看下.log檔案的檔案內容,關注裡面的baseOffset和postion的值。你看看和上面說的對應的上嗎。

4.3 如何用offset查詢 

  按上面的例子,如何查詢偏移量為60的訊息

  1. 根據offset首先找到對應的LogSegment,這裡找到00000000000000000000.index
  2. 通過二分法找到不大於offset的最大索引項,這裡找到offset:24 position:8773
  3. 開啟00000000000000000000.log檔案,從position為8773的那個地方開始順序掃描直到找到offset=60的訊息

 

 

 

參考文件:

http://kafka.apachecn.org/documentation.html#introduction

https://www.cnblogs.com/huxi2b/p/9579681.html

《Apache Kafka實戰》

&n