1. 程式人生 > >kafka的工作原理分析(三) -- 高可用副本機制

kafka的工作原理分析(三) -- 高可用副本機制

一、副本機制簡介

在kafka中,topic是可以拆分為多個分割槽進行儲存資料的,每個分割槽儲存的資料都是不一樣的。在kafka的叢集環境下,為了避免出現單節點宕機導致的資料丟失迭代情況,kafka提供了一種分割槽資料的副本機制,保證在某個分割槽的讀寫節點宕機時,資料不會丟失。

bin/kafka-topics.sh --create --zookeeper 192.168.0.15:2181 --replication-factor 3 --partitions 1 --topic test 

建立topic的時候通過指定replication-factor可以指定分割槽總共建立多少個副本,副本數不能超過broker數目。這些副本中,分為leader副本和follower副本,leader副本負責資料的讀寫請求,follower副本負責從leader副本複製資料到follower副本中備份資料。這樣在當leader副本所在機器出現宕機的時候就可以在follower副本里面重新選舉出一個leader副本來進行資料讀寫操作從而保證資料不會丟失。

副本分割槽分配方式

基本的分配思路就是
將n Broker和待分配的i個Partition排序
將第i個Partition分配到第(i mod n)個Broker上
將第i個Partition的第j個副本分配到第((i + j) mod n)個 Broker上.
由於副本數目是不能超過broker數目,所以副本會均勻分配到broekr上面。

副本leader

get /brokers/topics/test3p/partitions/0/state 命令可以在zookeeper中查詢topic指定分割槽的副本leader

{"controller_epoch":12,"leader
":0,"version":1,"leader_epoch":0,"isr":[0,1]}

可以檢視到leader:0,0表示副本所在的broker的id。

ISR(in-Sync replicas )裡面維護的是所有與leader資料差異在閾值範圍內的副本所在的broker id列表,當producer向broker傳送一條訊息時,訊息會寫入到leader,同時follower會不斷髮起資料同步請求同步leader的資料,在這過程可能leader和follower之間的資料同步會存在差異。當資料差異在(replica.lag.time.max.ms)長的時間內副本資料依然沒有和leader資料達成一致,那麼這個follower 的broker id就會從isr維護的列表中剔除出去,知道資料重新達成和leader一致時才會再次加入到isr列表;同時如果副本broker與zookeeper斷開連線,同樣會從isr列表裡面剔除出去。這個isr就是leader的候選人,除去已經是leader的broker id,isr中其他的id都可能成為leader。

二、副本協同機制

概念瞭解

kafka訊息的讀寫操作都只會由leader節點來接收 和處理。follower副本只負責同步資料以及當leader副本,當leader所在的 broker 掛了以後,會從isr中維護的 follower 副本中選取新的 leader。 那麼follower副本維護資料與leader保持一致的過程是怎麼樣實現的呢?
首先介紹兩個副本協同過程需要了解的概念
LEO(log end offset)
日誌末端位移(log end offset),記錄了該副本底層 日誌(log)中下一條訊息的offset。如果LEO=10,那麼表示該副本儲存了10條訊息, 位移值範圍是[0, 9]。對於leader副本而言,leo在新訊息寫入時leo更新;對於follower而言,在向leader副本發起一次同步資料請求時,leader有新訊息寫入並且沒同步給follower,這時將訊息返回給follower,follower的leo更新。

HW(high watermark)
高水位,用來指示,在hw之前的offset訊息都是可以消費的,也就是訊息是已經提交的概念。例如hw是10,那麼0-9的offset’的訊息都是可以進行消費的訊息。
對於leader而言,hw更新需要根據producer設定的ack模式來確定。
1.ack設定為0表示不需要任何副本確認,這時候訊息在leader副本寫入之後直接更新leader分割槽hw,表示訊息是已提交可以消費。
2.ack設定為1表示只需要leader副本確認訊息,這時候訊息在leader副本寫入之後直接更新leader分割槽hw,並且向producer傳送確認ack,表示訊息是已提交可以消費。
3.ack設定為-1表示訊息需要n個副本確認訊息,這時候需要leader收到n個副本的同步訊息成功的ack之後才更新leader的hw並且傳送ack給producer確認,表示訊息是已提交可以消費。
這個n值指的是min.insync.replicas,設定ISR中的最小副本數是多少,預設值為 1, 當且僅當 acks 引數設定為-1 (表示需要所有副本確認)時,此引數才生效. 表達的含義 是,isr中至少有這麼多個副本,需要isr中所有副本確認才可以認為訊息已經提交。

對於follower而言,hw用來在重新選舉leader時,表明哪些訊息是可以消費的。

協同過程

下面畫圖展示一下ack -1情況下的同步過程。

首先是初始狀態,leader和follower都沒有資料,各自的leo和hw都是0,此時follower不斷向leader傳送fecth請求請求同步資料,,但是因為producer沒有資料,這個請求會被leader寄存,當在指定的時間之後 會 強 制 完 成 請 求 , 這 個 時 間 配 置 是 (replica.fetch.wait.max.ms),如果在指定時間內 producer 有訊息傳送過來,那麼kafka會喚醒fetch請求,讓leader 繼續處理。如下圖所示。
這裡寫圖片描述

當producer傳送來一條訊息的時候。
1. 把訊息追加到log檔案,同時更新leader副本的LEO
2. 嘗試更新leader HW值。leader會比較自己的LEO以及remote LEO的值 發現最小值是0,與HW的值相同,所以不會更新HW 。
如下圖所示。
這裡寫圖片描述

然後加入這時候剛好有fecth請求時或者有fecth請求處於阻塞 holding中就會喚醒這個fecth請求,leader會做以下動作。
1. 讀取log資料、根據fecth請求的leo更新remote LEO=0
2. 嘗試更新 HW,這時候Remote leo是0,leo是1,那麼表示Remote 的follower副本還沒有複製到這條訊息資料,follower副本沒確認這條訊息資料,那麼hw就不能更新,表明這條訊息還不可以消費。
3. 把訊息內容和當前分割槽的HW值傳送給follower副本

follower接收到響應時,做以下動作。
1. 將訊息寫入到本地log,同時更新follower的LEO
2. 更新 follower HW,本地的 LEO 和 leader 返回的 HW 進行比較取小的值,所以仍然是0
如下圖所示。
這裡寫圖片描述

然後follower再一次傳送fecth請求到leader,leader接收到這一次請求之後,把Remote leo更新為1**(如果isr是多個副本需要所有isr 維護的副本都把leo = 1請求到leader才可以更新Remote leo,如果某些follower實在同步太慢可以把它踢出isr副本列表,然後isr中剩餘的副本確認了也是可以的)**,然後leader更新hw=1,表明offset 0訊息是可以消費的,同時給producer傳送確認ack。
如下圖所示。
這裡寫圖片描述

最後一步就是返回response給follower,更新follower的hw=1.
如下圖所示。
這裡寫圖片描述

上面就是一條訊息在ack = -1的情況下協同過程。

資料丟失

在協同過程的最後一步,其實就是更新follower的hw值,假如這個過程leader宕機了,那麼follower的hw值沒法更新,還是0,。此時副本重新選舉leader,選中follower作為新的leader,這時就會按照hw = 0值去截斷訊息,認為offset = 0的訊息是不可消費的。從而導致訊息丟失。
這裡寫圖片描述

kafka使 用leader epoch來解決這個問題。在/kafka-log/topic/leader-epochcheckpoint 檔案中會儲存
leader epoch資訊,leader epoch是 一對之(epoch,offset), epoch 就是 leader 的次數號,從 0 開始,每次選舉leader就會+1,而 offset則 對應於該 epoch 版本的 leader 寫入第一條訊息的位移。 比如說 (0,0) , (1,100); 表示第一個leader從offset=0開始寫訊息, 一共寫100條,第二個leader版本號是1,從100offset開 始寫訊息。
leader broker中會儲存這樣的一個快取,並定期地寫入到 checkpoint檔案中。 當 leader 寫 log 時它會嘗試更新整個快取;如果這個 leader 首次寫訊息,則會在快取中增加一個條目;而當follower成為新的leader時會查詢這部分快取,獲取出對應leader版本的offset,確認哪些訊息是否有效的。

所有的分割槽副本不可用?

  1. 等待 ISR 中的任一個 Replica重新啟動,選它作為 Leader
  2. 選擇第一個重啟過來的Replica(不一定是ISR中的)作 為Leader
    等待isr中的副本啟動過來可能等待時間較長,但是可以保證資料完整;隨便一個副本作為leader的話可能比較快讓分割槽可用,但是資料可能會有丟失。

isr的設計原理

副本備份資料是分散式儲存中常見的一種手段。單純的同步備份需要要求所有能工作的 Follower 副 本都複製完,這條訊息才會被認為提交成功,一旦有一個 follower副本出現故障,就會導致HW無法完成遞增,消費者就獲取不到訊息。假如單純的非同步複製,那麼就可能出現數據丟失的情況,無法保證follower副本資料的完整性。is維護了資料完整的副本備份列表,如果是資料差異較大就會剔除出isr,當需要重新選舉leader的時候,從isr選,可以保證副本資料的完整性。通過設定min.insync.replicas可以設定isr中至少有幾個副本,當有訊息需要同步時,需要得到isr中所有的副本確認同步才可以被認為是提交成功的,如果某些副本實在同步太慢,leader就會把它踢出isr集合,然後得到剩餘的isr副本確認就可以了。所以follower要麼傳送同步確認(leo跟上leader),要麼被踢出isr,兩種方式保證了訊息提交不會慢,也保證資料不會丟失。