1. 程式人生 > >kafka的高可用和一致性探究

kafka的高可用和一致性探究

1 kafka基礎 本篇文章討論的kafka版本是目前最新版 0.10.1.0。 1.1 kafka種的KafkaController 所有broker會通過ZooKeeper選舉出一個作為KafkaController,來負責: 監...

1 kafka基礎

本篇文章討論的kafka版本是目前最新版 0.10.1.0。

1.1 kafka種的KafkaController

所有broker會通過ZooKeeper選舉出一個作為KafkaController,來負責:

  • 監控所有broker的存活,以及向他們傳送相關的執行命令。
  • 分割槽的狀態維護:負責分割槽的新增、下線等,分割槽副本的leader選舉
  • 副本的狀態維護:負責副本的新增、下線等

1.2 kafka分割槽中的基本概念

每個分割槽可以有多個副本,分散在不同的broker上。

  • leader副本:被KafkaController選舉出來的,作為該分割槽的leader
  • 其他follower副本:其他副本都作為follower副本
  • isr列表:簡單描述就是,“跟得上”leader的副本列表(包含leader),最開始是所有副本。這裡的跟得上是指
    • replica.lag.time.max.ms:在0.9.0.0之前表示follower如果在此時間間隔內沒有向leader傳送fetch請求,則該follower就會被剔除isr列表,在0.9.0.0之後表示如果該follower在此時間間隔內一直沒有追上過leader的所有訊息,則該follower就會被剔除isr列表
    • replica.lag.max.messages(0.9.0.0版本中已被廢除):follower如果落後leader的訊息個數超過該值,則該follower就會被剔除isr列表
      廢除的主要原因是:目前這個配置是個統一配置,不同的topic速率生產速率不太一樣,沒辦法來指定一個具體的值來應用到所有的topic上。將來可以將這個配置下放到topic級別,關於這個問題,可以見這裡的討論Automate replica lag tuning

    每一個producer傳送訊息給某個分割槽的leader副本,其他follower副本又會複製該訊息。producer端有一個acks引數可以設定:

    • acks=0:表示producer不需要leader傳送響應,即producer只管發不管傳送成功與否。延遲低,容易丟失資料。
    • acks=1:表示leader寫入成功(但是並沒有重新整理到磁碟)後即向producer響應。延遲中等,一旦leader副本掛了,就會丟失資料。
    • acks=-1:表示leader會等待isr列表中所有副本都寫入成功才向producer傳送響應。延遲高、可靠性高。但是也會丟資料,下面會詳細討論

    同時對於isr列表的數量要求也有一個配置

    • min.insync.replicas:預設是1。當acks=-1的時候,leader在處理新訊息前,會先判斷當前isr列表的的size是否小於這個值,如果小於的話,則不允許寫入,返回NotEnoughReplicasException異常。同時,一旦允許寫入了之後,在響應producer之前也會判斷當前isr列表的size是否小於該值,如果小於返回NotEnoughReplicasAfterAppendException異常

    我們本篇文章就重點通過kafka的原理來揭示在acks=-1的情況下,哪些情況下會丟失資料,或許可以提一些改進措施來做到不丟失資料。

    下面會先介紹下leader和follower副本複製的原理

    1.3 副本複製過程

    • leader副本的屬性
      • highWatermarkMetadata:代表已經被isr列表複製的最大offset,consumer只能消費該offset之前的資料
      • logEndOffsetMetadata:代表leader副本上已經複製的最大offset
    • leader副本擁有其他副本的記錄,儲存著他們的如下屬性:
      • logEndOffsetMetadata:代表該follower副本已經複製的最大offset
      • lastCaughtUpTimeMs:記錄該follower副本上一次追上leader副本的所有訊息的時間
    • follower副本的屬性
      • highWatermarkMetadata:follower會獲取到leader的highWatermarkMetadata更新到自己的該屬性中
      • logEndOffsetMetadata:代表follower副本上已經複製的最大offset
    • 其中follower會不斷的向leader傳送fetch請求,如果沒有資料fetch則被leader阻塞一段時間,等待新資料的來臨,一旦來臨則解除阻塞,複製資料給follower。

    我們來看下當producer的acks=-1時,一次訊息寫入的整個過程,上述是屬性是怎麼變化的

    • 1.3.1 訊息準備寫入leader副本,leader副本首先判斷當前isr列表是否小於min.insync.replicas,不小於才允許寫入。
      如果不小於,leader寫入到自己的log中,得到該訊息的offset,然後對其他follower的fetch請求解除阻塞,複製一定量的訊息給follower
      同時leader將自己最新的highWatermarkMetadata傳給follower
      同時會判斷這次複製是否複製到leader副本的末尾了,即logEndOffsetMetadata位置,如果是的話,則更新上述的lastCaughtUpTimeMs
    • 1.3.2 follower會將fetch來的資料寫入到自己的log中,自己的logEndOffsetMetadata得到了更新,同時更新自己的highWatermarkMetadata,就是取leader傳來的highWatermarkMetadata和自己的logEndOffsetMetadata中的最小值
      然後follower再一次向leader傳送fetch請求,fetch的初始offset就是自己的logEndOffsetMetadata+1。
    • 1.3.3 leader副本收到該fetch後,會更新leader副本中該follower的logEndOffsetMetadata為上述fetch的offset,同時會對所有的isr列表的logEndOffsetMetadata排序得到最小的logEndOffsetMetadata作為最新的highWatermarkMetadata
      如果highWatermarkMetadata已經大於了leader寫入該訊息的offset了,說明該訊息已經被isr列表都複製過了,則leader開始迴應producer
      判斷當前isr列表的size是否小於min.insync.replicas,如果小於返回NotEnoughReplicasAfterAppendException異常,不小於則代表正常寫入了。
    • 1.3.4 follower在下一次的fetch請求的響應中就會得到leader最新的highWatermarkMetadata,更新自己的highWatermarkMetadata

    1.4 leader副本選舉

    如果某個broker掛了,leader副本在該broker上的分割槽就要重新進行leader選舉。來簡要描述下leader選舉的過程

    • 1.4.1 KafkaController會監聽ZooKeeper的/brokers/ids節點路徑,一旦發現有broker掛了,執行下面的邏輯。這裡暫時先不考慮KafkaController所在broker掛了的情況,KafkaController掛了,各個broker會重新leader選舉出新的KafkaController
    • 1.4.2 leader副本在該broker上的分割槽就要重新進行leader選舉,目前的選舉策略是
      • 1.4.2.1 優先從isr列表中選出第一個作為leader副本
      • 1.4.2.2 如果isr列表為空,則檢視該topic的unclean.leader.election.enable配置。
    • unclean.leader.election.enable:為true則代表允許選用非isr列表的副本作為leader,那麼此時就意味著資料可能丟失,為false的話,則表示不允許,直接丟擲NoReplicaOnlineException異常,造成leader副本選舉失敗。
      • 1.4.2.3 如果上述配置為true,則從其他副本中選出一個作為leader副本,並且isr列表只包含該leader副本。
    • 一旦選舉成功,則將選舉後的leader和isr和其他副本資訊寫入到該分割槽的對應的zk路徑上。
    • 1.4.3 KafkaController向上述相關的broker上傳送LeaderAndIsr請求,將新分配的leader、isr、全部副本等資訊傳給他們。同時將向所有的broker傳送UpdateMetadata請求,更新每個broker的快取的metadata資料。
    • 1.4.4 如果是leader副本,更新該分割槽的leader、isr、所有副本等資訊。如果自己之前就是leader,則現在什麼操作都不用做。如果之前不是leader,則需將自己儲存的所有follower副本的logEndOffsetMetadata設定為UnknownOffsetMetadata,之後等待follower的fetch,就會進行更新
    • 1.4.5 如果是follower副本,更新該分割槽的leader、isr、所有副本等資訊
      然後將日誌截斷到自己儲存的highWatermarkMetadata位置,即日誌的logEndOffsetMetadata等於了highWatermarkMetadata
      最後建立新的fetch請求執行緒,向新leader不斷髮送fetch請求,初次fetch的offset是logEndOffsetMetadata。

    上述重點就是leader副本的日誌不做處理,而follower的日誌則需要截斷到highWatermarkMetadata位置。

    至此,算是簡單描述了分割槽的基本情況,下面就針對上述過程來討論下kafka分割槽的高可用和一致性問題。

    2 訊息丟失

    2.1 訊息丟失的場景

    哪些場景下會丟失訊息?

    • acks= 0、1,很明顯都存在訊息丟失的可能。
    • 即使設定acks=-1,當isr列表為空,如果unclean.leader.election.enable為true,則會選擇其他存活的副本作為新的leader,也會存在訊息丟失的問題。
    • 即使設定acks=-1,當isr列表為空,如果unclean.leader.election.enable為false,則不會選擇其他存活的副本作為新的leader,即犧牲了可用性來防止上述訊息丟失問題。
    • 即使設定acks=-1,並且選出isr中的副本作為leader的時候,仍然是會存在丟資料的情況的:
      s1 s2 s3是isr列表,還有其他副本為非isr列表,s1是leader,一旦某個日誌寫入到s1 s2 s3,則s1將highWatermarkMetadata提高,並回復了客戶端ok,但是s2 s3的highWatermarkMetadata可能還沒被更新,此時s1掛了,s2當選leader了,s2的日誌不變,但是s3就要截斷日誌了,這時已經回覆客戶端的日誌是沒有丟的,因為s2已經複製了。
      但是如果此時s2一旦掛了,s3當選,則s3上就不存在上述日誌了(前面s2當選leader的時候s3已經將日誌截斷了),這時候就造成日誌丟失了。

    2.2 不丟訊息的探討

    其實我們是希望上述最後一個場景能夠做到不丟訊息的,但是目前的做法還是可能會丟訊息的。

    丟訊息最主要的原因是:

    由於follower的highWatermarkMetadata相對於leader的highWatermarkMetadata是延遲更新的,當leader選舉完成後,所有follower副本的截斷到自己的highWatermarkMetadata位置,則可能截斷了已被老leader提交了的日誌,這樣的話,這部分日誌僅僅存在新的leader副本中,在其他副本中消失了,一旦leader副本掛了,這部分日誌就徹底丟失了

    這個截斷到highWatermarkMetadata的操作的確太狠了,但是它的用途有一個就是:**避免了日誌的不一致的問題**。通過每次leader選舉之後的日誌截斷,來達到和leader之間日誌的一致性,避免出現日誌錯亂的情況。

    ZooKeeper和Raft的實現也有類似的日誌複製的問題,那ZooKeeper和Raft的實現有沒有這種問題呢?他們是如何解決的呢?

    Raft並不進行日誌的截斷操作,而是會通過每次日誌複製時的一致性檢查來進行日誌的糾正,達到和leader來保持一致的目的。不截斷日誌,那麼對於已經提交的日誌,則必然存在過半的機器上從而能夠保證日誌基本是不會丟失的。

    ZooKeeper只有當某個follower的記錄超出leader的部分才會截斷,其他的不會截斷的。選舉出來的leader是經過過半pk的,必然是包含全部已經被提交的日誌的,即使該leader掛了,再次重新選舉,由於不進行日誌截斷,仍然是可以選出其他包含全部已提交的日誌的(有過半的機器都包含全部已提交的日誌)。ZooKeeper對於日誌的糾正則是在leader選舉完成後專門開啟一個糾正過程。

    kafka的截斷到highWatermarkMetadata的確有點太粗暴了,如果不截斷日誌,則需要解決日誌錯亂的問題,即使不能夠像ZooKeeper那樣花大代價專門開啟一個糾正過程,可以像Raft那樣每次在fetch的時候可以進行不斷的糾正。這一塊還有待繼續關注。

    3 順序性

    kafka目前是隻能保證一個分割槽內的資料是有序的。

    但是你可能經常聽說,一旦某個broker掛了,就可能產生亂序問題(也沒人指出亂序的原因),是否正確呢?

    首先來看看如何能保證單個分割槽內訊息的有序性,有如下幾個過程:

    • 3.1 producer按照訊息的順序進行傳送
      很多時候為了傳送效率,採用的辦法是多執行緒、非同步、批量傳送。
      如果為了保證順序,則不能使用多執行緒來執行傳送任務。
      非同步:一般是把訊息先發到一個佇列中,由後臺執行緒不斷的執行傳送任務。這種方式對訊息的順序也是有影響的:
      如先發送訊息1,後傳送訊息2,此時伺服器端在處理訊息1的時候返回了異常,可能在處理訊息2的時候成功了,此時若再重試訊息1就會造成訊息亂序的問題。所以producer端需要先確認訊息1傳送成功了才能執行訊息2的傳送。
      對於kafka來說,目前是非同步、批量傳送。解決非同步的上述問題就是配置如下屬性:
      max.in.flight.requests.per.connection=1

      即producer發現一旦還有未確認傳送成功的訊息,則後面的訊息不允許傳送。
    • 3.2 相同key的訊息能夠hash到相同的分割槽
      正常情況下是沒問題的,但是一旦某個分割槽掛了,如原本總共4個分割槽,此時只有3個分割槽存活,在此分割槽恢復的這段時間內,是否會存在hash錯亂到別的分割槽?
      那就要看producer端獲取的metadata資訊是否會立馬更新成3個分割槽。目前看來應該是不會的
      producer見到的metadata資料是各個broker上的快取資料,這些快取資料是由KafkaController來統一進行更新的。一旦leader副本掛了,KafkaController並不會去立馬更新成3個分割槽,而是去執行leader選舉,選舉完成後才會去更新metadata資料,此時選舉完成後仍然是保證4個分割槽的,也就是說producer是不可能獲取到只有3個分割槽的metadata資料的,所以producer端還是能正常hash的,不會錯亂分割槽的。
      在整個leader選舉恢復過程,producer最多是無法寫入資料(後期可以重試)。
    • 3.3 系統對順序訊息的支援
      leader副本按照訊息到來的先後順序寫入本地日誌的,一旦寫入日誌後,該順序就確定了,follower副本也是按照該順序進行復制的。對於訊息的提交也是按照訊息的offset從低到高來確認提交的,所以說kafka對於訊息的處理是順序的。
    • 3.4 consumer能夠按照訊息的順序進行消費
      為了接收的效率,可能會使用多執行緒進行消費。這裡為了保證順序就只能使用單執行緒來進行消費了。
      目前kafka的Consumer有scala版本的和java版本的(這一塊之後再詳細探討),最新的java版本,對使用者提供一個poll方法,使用者自己去決定是使用多執行緒還是單執行緒。

    4 其他話題

    • 如何看待kafka的isr列表設計?和過半怎麼對比呢?
      對於相同數量的2n個follower加一個leader,過半呢則允許n個follower掛掉,而isr呢則允許2n個follower掛掉(但是會存在丟失訊息的問題),所以過半更多會犧牲可用性(掛掉一半以上就不可用了)來增強資料的一致性,而isr會犧牲一致性來增強可用性(掛掉一半以上扔可使用,但是存在丟資料的問題)
      但是在確認效率上:過半僅僅需要最快的n+1的寫入成功即可判定為成功,而isr則需要2n+1的寫入成功才算成功。同時isr是動態變化的過程,一旦跟不上或者跟上了都會離開或者加入isr列表。isr列表越小寫入速度就會加快。
    • 有哪些環節會造成訊息的重複消費?如果避免不了,如何去減少重複?
      • producer端重複傳送
    • producer端因傳送超時等等原因做重試操作,目前broker端做重複請求的判斷還是很難的,目前kafka也沒有去做,而是儲存完訊息之後,如果開啟了Log compaction,它會通過kafka訊息中的key來判定是否是重複訊息,是的話則會刪除。
      • consumer消費後,未及時提交消費的offset便掛了,下次恢復後就會重複消費
    • 這個目前來說並沒有通用的解決辦法,先消費後提交offset可能會重複,先提交offset後消費可能造成訊息丟失,所以一般還是優先保證訊息不丟,在業務上去做去重判斷。