1. 程式人生 > >分散式訊息通訊Kafka原理分析(二)

分散式訊息通訊Kafka原理分析(二)

本章重點:

1.訊息的儲存原理
2.Partition的副本機制原理
3.副本資料同步原理

訊息的檔案儲存機制

通過如 下命令找到對應partition下的日誌內容 

[root@localhost ~]# ls /tmp/kafka-logs/firstTopic-1/ 00000000000000000000.index 
00000000000000000000.log  00000000000000000000.timeindex  leader-epochcheckpoint 

Kafka是通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個LogSegment對應磁碟上的一個日誌檔案索引檔案,其中日誌檔案是用來記錄訊息的。索引檔案是用來儲存訊息的索引。

LogSegment是什麼

Kafka以Segment為單位把Partition進行細分,每個Partition相當於一個巨型檔案被平均分配到多個大小相等的segment資料檔案中(每個segment檔案中的訊息不一定相等),這種特性方便已經被消費的訊息的清理,提高磁碟的利用率。

log.segment.bytes=107370(設定分段大小),預設是1gb,我們把這個值調小以後,可以看到日誌分段的效果。

抽取其中3個分段來進行分析

segment file 由 2 大部分組成,分別為 index file 和 data file,此 2個檔案一一對應,成對出現,字尾".index"和“.log” 分別表示為segment索引檔案、資料檔案.。

segment 檔案命名規則:partion 全域性的第一個 segment 從 0 開始,後續每個 segment 檔名為上一個 segment 檔案最後一條訊息的offset值進行遞增。數值最大為64位 long大小,20位數字字元長度,沒有數字用0填充 。

檢視segment檔案命名規則

sh kafka-run-class.sh kafka.tools.DumpLogSegments -files /tmp/kafka-logs/test0/00000000000000000000.log --print-data-log 
輸出結果為: 
offset: 5376 position: 102124 CreateTime: 1531477349287 isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch:1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5376 

第一個log檔案的最後一個offset為:5376,所以下一個segment的檔案命名為:

00000000000000005376.log

00000000000000005376.index 

segment中的index和log的對應關係

從所有分段中,找一個分段進行分析 為了提高查詢訊息的效能,為每一個日誌檔案新增 2 個索 引索引檔案:OffsetIndex 和 TimeIndex,分別對應*.index 以及*.timeindex。

TimeIndex索引檔案格式:它是對映時間戳和相對offset 。

查 看 索 引 內 容 : 
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafkalogs/test-0/00000000000000000000.index --print-datalog 

如圖所示:index中儲存了索引以及物理偏移量。log儲存了訊息的內容,索引檔案的元資料執行對應資料檔案中message的物理偏移位置。

舉個簡單的案例來說,以 [4053,80899]為例,在log檔案中,對應的是第4053條記 錄,物理偏移量(position)為 80899.  position 是 ByteBuffer的指標位置 。

在partition中如何通過offset查詢message

查詢演算法是:

1.根據offset的值,查詢segment段中的索引檔案。由於索引檔案命名是以上一個檔案的最後一個offset進行命名的,所以,使用二分查詢演算法能夠根據offset快速定位到指定的索引檔案。
2.找到索引檔案後,根據offset進行定位,找到索引檔案中的符合範圍的索引。(kafka採用稀疏索引的方式來提高查詢效能)。
3.得到position以後,再到對應的log檔案中,從position處開始查詢offset對應的訊息。將每條訊息的offset與目標offset進行比較,直到找到訊息。

比如說,我們要查詢 offset=2490 這條訊息,那麼先找到 00000000000000000000.index, 然後找到[2487,49111]這 個索引,再到 log 檔案中,根據 49111 這個 position 開始 查詢,比較每條訊息的offset是否大於等於2490。最後查 找到對應的訊息以後返回 。

Log檔案的訊息內容分析

檢視二進位制日誌檔案資訊,一條訊息,包含很多的欄位。

offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: 1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5371 
offset和position這兩個前面已經講過了、 createTime表示建立時間、keysize 和 valuesize 表示 key 和 value 的大 小、 compresscodec表示壓縮編碼、payload:表示訊息的具體內容 

日誌的清除策略以及壓縮策略

日誌清除策略

前面提到過,日誌是分段儲存,一方面能夠減少單個檔案內容大小,另外一方面,方便Kafka進行日誌清理。日誌的清理策略有兩個:

1.根據訊息的保留時間,當訊息在Kafka中儲存的時間超過指定時間,就會觸發清理過程
2.根據topic儲存的資料大小,當topic所佔的日誌檔案大小大於一定的閾值,則可以開始刪除最舊的訊息。Kafka會啟動一個後臺執行緒,定期檢查是否存在可以刪除的訊息。

通過log.retention.bytes和log.retention.hours這兩個引數來設定,當其中任意一個達到要求,都會執行刪除。預設的保留時間是7天。

日誌壓縮策略

Kafka還提供了日誌壓縮(Log Compaction)功能,可以有效地減少日誌檔案地大小,緩解磁碟緊張地情況,在很多實際場景中,訊息的 key 和 value 的值 之間的對應關係是不斷變化的,就像資料庫中的資料會不斷被修改一樣,消費者只關心key對應的最新的value。

因 此,我們可以開啟 kafka 的日誌壓縮功能,服務端會在後臺啟動啟動Cleaner執行緒池,定期將相同的key進行合併, 只保留最新的value值。日誌的壓縮原理是 
 
partition的高可用副本策略

我們已經知道Kafka的每個topic都可以分為多個Partition, 並且多個partition會均勻分佈在叢集的各個節點下。雖然 這種方式能夠有效的對資料進行分片,但是對於每個 partition來說,都是單點的,當其中一個partition不可用 的時候,那麼這部分訊息就沒辦法消費。所以 kafka 為了提高partition的可靠性而提供了副本的概念(Replica),通過副本機制來實現冗餘備份。

每個分割槽可以有多個副本,並且在副本集合中會存在一個 leader的副本,所有的讀寫請求都是由leader副本來進行 處理。剩餘的其他副本都做為 follower 副本,follower 副本會從 leader 副本同步訊息日誌。這個有點類似 zookeeper中leader和follower的概念,但是具體的時間 方式還是有比較大的差異。所以我們可以認為,副本集會存在一主多從的關係。 一般情況下,同一個分割槽的多個副本會被均勻分配到叢集 中的不同broker上,當leader副本所在的broker出現故障後,可以重新選舉新的 leader 副本繼續對外提供服務。 通過這樣的副本機制來提高kafka叢集的可用性。

副本分配演算法

1.將所有N Broker和待分配的i個partition排序
2.將第i個partition分配到第(i mod n)個Broker上
3.將第i個partition的第j個副本分配到第((i+j)mod n)個broker上

 建立一個帶副本機制的topic

./kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 2 --partitions 3 -topic secondTopic 

然後我們可以在/tmp/kafka-log 路徑下看到對應 topic 的 副本資訊了。我們通過一個圖形的方式來表達。  針對 secondTopic 這個 topic 的 3 個分割槽對應的 3 個副 本 
 

如何知道那個各個分割槽中對應的leader是誰呢? 在zookeeper伺服器上,通過如下命令去獲取對應分割槽的 資訊, 比如下面這個是獲取 secondTopic 第 1 個分割槽的狀 態資訊。 

get /brokers/topics/secondTopic/partitions/1/state 

{"controller_epoch":12,"leader":0,"version":1,"leader_ep och":0,"isr":[0,1]} leader

leader表示當前分割槽的leader是那個broker-id。下圖中:綠色線條的表示該分割槽中的 leader 節點。其他節點就為 follower 
 

Kafka 提供了資料複製演算法保證,如果 leader 發生故障或 掛掉,一個新leader被選舉並被接受客戶端的訊息成功寫 入。Kafka確保從同步副本列表中選舉一個副本為leader; leader 負責維護和跟蹤 ISR(in-Sync replicas , 副本同步佇列)中所有 follower 滯後的狀態。

當 producer 傳送一條 訊息到broker後,leader寫入訊息並複製到所有follower。 訊息提交之後才被成功複製到所有的同步副本。

既然有副本機制,就一定涉及到資料同步的概念,那接 下來分析下資料是如何同步的? 需要注意的是,不要把 zookeeper 的 leader 和 follower 的同步機制和 kafka 副本的同步機制搞混了。雖然從思想層面來說是一樣的,但是原理層面的實現是完全 不同的。 

Kafka副本機制中的幾個概念

Kafka 分割槽下有可能有很多個副本(replica)用於實現冗餘, 從而進一步實現高可用。副本根據角色的不同可分為3類:

  1. leader副本:響應clients端讀寫請求的副本
  2. follower副本:被動地備份leader副本中的資料,不能響應clients端讀寫請求。
  3. ISR副本:包含了leader副本和所有與leader副本保持同步的 follower 副本——如何判定是否與 leader 同步後面會提到每個 Kafka 副本物件都有兩個重要的屬性:LEO 和 HW。注意是所有的副本,而不只是leader副本。 

LEO:即日誌末端位移(log end offset),記錄了該副本底層 日誌(log)中下一條訊息的位移值。注意是下一條訊息!也就是說,如果LEO=10,那麼表示該副本儲存了10條訊息, 位移值範圍是[0, 9]。另外,leader LEO和follower LEO的 更新是有區別的。我們後面會詳細說 HW:即上面提到的水位值。對於同一個副本物件而言,其 HW值不會大於LEO值。小於等於HW值的所有訊息都被 認為是“已備份”的(replicated)。同理,leader 副本和 follower副本的HW更新是有區別的 。

副本協同機制

剛剛提到了,訊息的讀寫操作都只會由leader節點來接收 和處理。follower副本只負責同步資料以及當leader副本所在的 broker 掛了以後,會從 follower 副本中選取新的 leader。

寫請求首先由 Leader 副本處理,之後 follower 副本會從 leader 上拉取寫入的訊息,這個過程會有一定的延遲,導致follower副本中儲存的訊息略少於leader副本,但是隻要沒有超出閾值都可以容忍。但是如果一個 follower 副本出現異常,比如宕機、網路斷開等原因長時間沒有同步到 訊息,那這個時候,leader就會把它踢出去。kafka通過ISR 集合來維護一個分割槽副本資訊 

ISR 
ISR 表示目前“可用且訊息量與 leader 相差不多的副本集合, 這是整個副本集合的一個子集”。怎麼去理解可用和相差不多 這兩個詞呢?具體來說,ISR集合中的副本必須滿足兩個條件 

1.副本所在節點必須維持著與Zookeeper的連線
2.副本最後一條訊息的offset與leader副本的最後一條訊息的offset之間的差值不能超過指定的閾值(replica.lag.time.max.ms):如果該follower在此時間間隔內一直沒有追上過leader的所有訊息,則follower就會被剔除isr列表。

ISR 數 據 保 存 在 Zookeeper 的 /brokers/topics/<topic>/partitions/<partitionId>/stat e節點中 

HW&LEO

關於follower副本同步的過程中,還有兩個關鍵的概念, HW(HighWatermark)和 LEO(Log End Offset). 這兩個引數跟ISR集合緊密關聯。HW標記了一個特殊的offset,當 消費者處理訊息的時候,只能拉去到HW之前的訊息,HW 之後的訊息對消費者來說是不可見的。也就是說,取 partition 對應 ISR 中最小的 LEO 作為 HW,consumer 最多隻能消費到 HW 所在的位置。每個 replica 都有 HW, leader和follower各自維護更新自己的HW的狀態。一條 訊息只有被 ISR 裡的所有 Follower 都從 Leader 複製過去 才會被認為已提交。這樣就避免了部分資料被寫進了 Leader,還沒來得及被任何Follower複製就宕機了,而造 成資料丟失(Consumer 無法消費這些資料)。而對於 Producer 而言,它可以選擇是否等待訊息 commit,這可 以通過acks來設定。這種機制確保了只要ISR有一個或以 上的Follower,一條被commit的訊息就不會丟失。

資料的同步過程

瞭解了副本的協同過程以後,還有一個最重要的機制,就 是資料的同步過程。它需要解決

  1. 怎麼傳播訊息
  2. 在向訊息傳送端返回 ack 之前需要保證多少個 Replica 已經接收到這個訊息

資料的處理過程是 Producer 在釋出訊息到某個 Partition 時,先通過ZooKeeper 找到該 Partition 的 Leader 【 get /brokers/topics/<topic>/partitions/2/state】,然後無論該 Topic的Replication Factor為多少(也即該Partition有多 少個Replica),Producer只將該訊息傳送到該Partition的 Leader。Leader會將該訊息寫入其本地Log。每個Follower 都從Leader pull資料。這種方式上,Follower儲存的資料 順序與Leader保持一致。Follower在收到該訊息並寫入其 Log後,向Leader傳送ACK。一旦Leader收到了ISR中 的所有Replica的ACK,該訊息就被認為已經commit了, Leader 將增加 HW(HighWatermark)並且向 Producer 傳送 ACK。 

初始狀態
初始狀態下,leader 和 follower 的 HW 和 LEO 都是 0, leader副本會儲存remote LEO,表示所有follower LEO, 也會被初始化為0,這個時候,producer沒有傳送訊息。 follower會不斷地給leader傳送fetch請求,但是因為沒有資料,這個請求會被leader寄存,當在指定的時間之後會強制完成請求 , 這個時間配置是 (replica.fetch.wait.max.ms),如果在指定時間內 producer 有訊息傳送過來,那麼kafka會喚醒fetch請求,讓leader 繼續處理 。

這裡會分兩種情況:

  1. 第一種是 leader 處理完 producer 請求之後,follower 傳送一個 fetch 請求過來;
  2. 第二種是 follower 阻塞在 leader 指定時間之內,leader 副本收到 producer的請求。

第一種情況:

leader副本收到請求以後,會做幾件事情

  1.  把訊息追加到log檔案,同時更新leader副本的LEO
  2.  嘗試更新leader HW值。這個時候由於follower副本還 沒有傳送fetch請求,那麼leader的remote LEO仍然 是0。leader會比較自己的LEO以及remote LEO的值 發現最小值是0,與HW的值相同,所以不會更新HW 

follower fetch 訊息 

follower 傳送fetch請求,leader副本的處理邏輯是:

  1. 讀取log資料、更新remote LEO=0(follower還沒有寫入這條訊息,這個值是根據 follower 的 fetch 請求中的 offset來確定的)
  2. 嘗試更新 HW,因為這個時候 LEO 和 remoteLEO 還是 不一致,所以仍然是HW=0
  3. 把訊息內容和當前分割槽的HW值傳送給follower副本 follower副本收到response以後
    1. 1將訊息寫入到本地log,同時更新follower的LEO

    2.  

      1更新 follower HW,本地的 LEO 和 leader 返回的 HW 進行比較取小的值,所以仍然是0 第一次互動結束以後,HW仍然還是0,這個值會在下一次 follower發起fetch請求時被更新 

       

       

follower發第二次fetch請求,leader收到請求以後

  1. 1. 讀取log資料
  2. 2. 更新remote LEO=1, 因為這次fetch攜帶的offset是 1.
  3. 3. 更新當前分割槽的HW,這個時候leader LEO和remote LEO都是1,所以HW的值也更新為1 4. 把資料和當前分割槽的HW值返回給follower副本,這個 時候如果沒有資料,則返回為空 

follower副本收到response以後

  1. 1. 如果有資料則寫本地日誌,並且更新LEO
  2. 2. 更新follower的HW值  到目前為止,資料的同步就完成了,意味著消費端能夠消 費offset=0這條訊息。 

follower 的 fetch 請求是直接從阻塞過程中觸發 
前面說過,由於 leader 副本暫時沒有資料過來,所以 follower 的 fetch 會被阻塞,直到等待超時或者 leader 接收到新的資料。當leader收到請求以後會喚醒處於阻塞的 fetch請求。處理過程基本上和前面說的一直

  1. 1. leader將訊息寫入本地日誌,更新Leader的LEO
  2. 2. 喚醒follower的fetch請求
  3. 3. 更新HW 

kafka使用HW和LEO的方式來實現副本資料的同步,本 身是一個好的設計,但是在這個地方會存在一個數據丟失 的問題,當然這個丟失只出現在特定的背景下。我們回想 一下,HW的值是在新的一輪FETCH 中才會被更新。我們分析下這個過程為什麼會出現資料丟失 .

資料丟失的問題 
前提:min.insync.replicas=1的時候。 ->設定ISR中的最小 副本數是多少,預設值為 1, 當且僅當 acks 引數設定為-1 (表示需要所有副本確認)時,此引數才生效. 表達的含義 是,至少需要多少個副本同步才能表示訊息是提交的 所以,當min.insync.replicas=1的時候 一旦訊息被寫入 leader 端 log 即被認為是“已提交”,而延 遲一輪 FETCH RPC 更新 HW 值的設計使得 follower HW 值是非同步延遲更新的,倘若在這個過程中leader發生變更, 那麼成為新 leader 的 follower 的 HW 值就有可能是過期 的,使得clients端認為是成功提交的訊息被刪除
 

資料丟失的解決方案 
在 kafka0.11.0.0 版本以後,提供了一個新的解決方案,使用leader epoch來解決這個問題,leader epoch實際上是 一對之(epoch,offset), epoch 表示 leader 的版本號,從 0 開始,當leader變更過1次時epoch就會+1,而 offset則 對應於該 epoch 版本的 leader 寫入第一條訊息的位移。 比如說 (0,0) ; (1,50);  表示第一個leader從offset=0開始寫訊息, 一共寫了50條,第二個leader版本號是1,從50條處開始寫訊息。這個資訊儲存在對應分割槽的本地磁碟檔案中, 文 件 名 為 : /tml/kafka-log/topic/leader-epochcheckpoint 
 
leader broker中會儲存這樣的一個快取,並定期地寫入到 一個checkpoint檔案中。 當 leader 寫 log 時它會嘗試更新整個快取——如果這個 leader 首次寫訊息,則會在快取中增加一個條目;否則就 不做更新。而每次副本重新成為leader時會查詢這部分緩 存,獲取出對應leader版本的offset 

如何處理所有的Replica不工作的情況 
在 ISR 中至少有一個 follower 時,Kafka 可以確保已經 commit 的資料不丟失,但如果某個 Partition 的所有 Replica都宕機了,就無法保證資料不丟失了

  1. 等待 ISR 中的任一個 Replica“活”過來,並且選它作為 Leader
  2. 選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader 這就需要在可用性和一致性當中作出一個簡單的折衷。 如果一定要等待 ISR 中的 Replica“活”過來,那不可用的時 間就可能會相對較長。而且如果ISR中的所有Replica都無 法“活”過來了,或者資料都丟失了,這個 Partition 將永遠不可用。 選擇第一個“活”過來的 Replica 作為 Leader,而這個 Replica 不是 ISR 中的 Replica,那即使它並不保證已經包 含了所有已 commit 的訊息,它也會成為 Leader 而作為 consumer 的資料來源(前文有說明,所有讀寫都由 Leader 完成)。 

ISR 的設計原理 
在所有的分散式儲存中,冗餘備份是一種常見的設計方式,而常用的模式有同步複製和非同步複製,按照kafka這個副本模型 來說 如果採用同步複製,那麼需要要求所有能工作的 Follower 副本都複製完,這條訊息才會被認為提交成功,一旦有一個 follower副本出現故障,就會導致HW無法完成遞增,訊息就無法提交,消費者就獲取不到訊息。這種情況下,故障的 Follower副本會拖慢整個系統的效能,設定導致系統不可用 。

如果採用非同步複製,leader副本收到生產者推送的訊息後,就認為次訊息提交成功。follower 副本則非同步從 leader 副本同步。這種設計雖然避免了同步複製的問題,但是假設所有 follower副本的同步速度都比較慢,他們儲存的訊息量遠遠落後 於 leader 副本。而此時 leader 副本所在的 broker 突然宕機, 則會重新選舉新的 leader 副本,而新的 leader 副本中沒有原來leader副本的訊息。這就出現了訊息的丟失。

kafka 權衡了同步和非同步的兩種策略,採用 ISR 集合,巧妙解 決了兩種方案的缺陷:

  1. 當follower副本延遲過高,leader副本則會把該follower副本剔除ISR集合,訊息依然可以快速提交。
  2. 當 leader 副本所在的 broker 突然宕機,會優先將 ISR 集合中 follower 副本選舉為 leader,新 leader 副本包含了 HW 之前 的全部訊息,這樣就避免了訊息的