1. 程式人生 > >Kafka工作流程-KafkaCluster和Kafka 高可靠性儲存

Kafka工作流程-KafkaCluster和Kafka 高可靠性儲存

1.KafkaCluster

    在使用 Kafka 低階消費者時,可以通過 KafkaCluster 類實現 offset 向 ZooKeeper 的提交 和獲取。

    Kafka 協議非常簡單,只有六個核心客戶端請求 API:

        元資料(Metadata) - 描述當前可用的代理,主機和埠資訊,並提供有關哪個代理主機分割槽的資訊。

        傳送(Send) - 傳送訊息給經紀人

        獲取(Fetch) - 從代理獲取訊息,獲取資料,獲取叢集元資料,獲取關於主題的 偏移資訊的訊息。

        偏移量(Offsets) - 獲取有關給定主題分割槽的可用偏移量的資訊。

        偏移提交(Offset Commit)- 為消費者組提供一組偏移量

        偏移獲取(Offset Fetch) - 為消費者組取得一組偏移量

2.Kafka 高可靠性儲存

    Kafka 的高可靠性的保障來源於其健壯的副本(replication)策略。通過調節其副本相關 引數,可以使得 Kafka 在效能和可靠性之間運轉的遊刃有餘。Kafka 從 0.8.x 版本開始提供 partition 級別的複製,replication 的數量可以在$KAFKA_HOME/config/server.properties 中配 置(default.replication.refactor)。

    Kafka 檔案儲存機制

    Kafka 中訊息是以 topic 進行分類的,生產者通過 topic 向 Kafka broker 傳送訊息,消費者通過 topic 讀取資料。

    為了便於說明問題,假設這裡只有一個 Kafka 叢集,且這個叢集只有一個 Kafka broker, 即只有一臺物理機。在這個 Kafka broker 中配置($KAFKA_HOME/config/ server.properties 中)log.dirs=/tmp/kafka-logs,以此來設定 Kafka 訊息檔案儲存目錄,與此同時建立一個 topic: topic_zzh_test,partition 的數量為 4($KAFKA_HOME/bin/kafka-topics.sh –create – zookeeper localhost:2181 –partitions 4 –topic topic_zzh_test –replication-factor 4)。那麼我 們此時可以在/tmp/kafka-logs 目錄中可以看到生成了 4 個目錄:

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0 

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1 

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2 

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

    在 Kafka 檔案儲存中,同一個 topic 下有多個不同的 partition,每個 partiton 為一個目錄, partition 的名稱規則為:topic 名稱+有序序號,第一個序號從 0 開始計,最大的序號為 partition 數量減 1,partition 是實際物理上的概念,而 topic 是邏輯上的概念。

    partition 還可以細分為 segment,這個 segment 又是什麼?

    如果就以 partition 為最小儲存單位,我們可以想象當 Kafka producer 不斷髮送訊息,必 然會引起 partition 檔案的無限擴張,這樣對於訊息檔案的維護以及已經被消費的訊息的清理 帶來嚴重的影響,所以這裡以 segment 為單位又將 partition 細分。每個 partition(目錄)相當於 一個巨型檔案被平均分配到多個大小相等的 segment(段)資料檔案中(每個 segment 檔案中 訊息數量不一定相等),這種特性也方便 old segment 的刪除,即方便已被消費的訊息的清 理,提高磁碟的利用率。每個 partition 只需要支援順序讀寫就行,segment 的檔案生命週期 由服務端配置引數(log.segment.bytes,log.roll.{ms,hours}等若干引數)決定。

    segment 檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為 segment 索引檔案和資料檔案(引入索引檔案的目的就是便於利用二分查詢快速定位 message 位置)。 這兩個檔案的命令規則為:partition 全域性的第一個 segment 從 0 開始,後續每個 segment 文 件名為上一個 segment 檔案最後一條訊息的 offset 值,數值大小為 64 位,20 位數字字元長 度,沒有數字用 0 填充,如下:

以上面的 segment 檔案為例,展示出 segment:00000000000000170410 的“.index”文 件和“.log”檔案的對應的關係,如下圖:

    如上圖所示,“.index”索引檔案儲存大量的元資料,“.log”資料檔案儲存大量的 訊息,索引檔案中的元資料指向對應資料檔案中 message 的物理偏移地址。其中以“.index” 索引檔案中的元資料[3, 348]為例,在“.log”資料檔案表示第 3 個訊息,即在全域性 partition 中表示 170410+3=170413 個訊息,該訊息的物理偏移地址為 348。

    那麼如何從 partition 中通過 offset 查詢 message 呢?

    以上圖為例,讀取 offset=170418 的訊息,首先查詢 segment 檔案,其中 00000000000000000000.index 為最開始的檔案,第二個檔案為 00000000000000170410.index (起始偏移為 170410+1=170411),而第三個檔案為 00000000000000239430.index(起始偏 移為 239430+1=239431),所以這個 offset=170418 就落到了第二個檔案之中。其他後續文 件可以依次類推,以其偏移量命名並排列這些檔案,然後根據二分查詢法就可以快速定位到 具體檔案位置。其次根據 00000000000000170410.index 檔案中的[8,1325]定位到 00000000000000170410.log 檔案中的 1325 的位置進行讀取。

    要是讀取 offset=170418 的訊息,從 00000000000000170410.log 檔案中的 1325 的位置進 行讀取,那麼怎麼知道何時讀完本條訊息,否則就讀到下一條訊息的內容了?

    這個就需要聯絡到訊息的物理結構了,訊息都具有固定的物理結構,包括:offset(8 Bytes)、訊息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、 key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等欄位,可以確定一條訊息的大 小,即讀取到哪裡截止。

3. 複製原理和同步方式

    Kafka 中 topic 的每個 partition 有一個預寫式的日誌檔案,雖然 partition 可以繼續細分為 若干個 segment 檔案,但是對於上層應用來說可以將 partition 看成最小的儲存單元(一個由 多個 segment 檔案拼接的“巨型”檔案),每個 partition 都由一些列有序的、不可變的訊息 組成,這些訊息被連續的追加到 partition 中。

    圖 2-13 中有兩個新名詞:HW 和 LEO。這裡先介紹下 LEO,LogEndOffset 的縮寫,表示每個 partition 的 log 最後一條 Message 的位置。HW 是 HighWatermark 的縮寫,是指 consumer 能夠看到的此 partition 的位置,這個涉及到多副本的概念。

    為了提高訊息的可靠性,Kafka 每個 topic 的 partition 有 N 個副本(replicas), 其中 N(大於等於 1)是 topic 的複製因子(replica fator)的個數。Kafka 通過多副本機制實現 故障自動轉移,當 Kafka 叢集中一個 broker 失效情況下仍然保證服務可用。在 Kafka 中發生 複製時確保 partition 的日誌能有序地寫到其他節點上,N 個 replicas 中,其中一個 replica 為 leader,其他都為 follower,leader 處理 partition 的所有讀寫請求,與此同時,follower 會被 動定期地去複製 leader 上的資料。

    如下圖所示,Kafka 叢集中有 4 個 broker, 某 topic 有 3 個 partition,且複製因子即副本個 數也為 3:

    Kafka 提供了資料複製演算法保證,如果 leader 發生故障或掛掉,一個新 leader 被選舉並 被接受客戶端的訊息成功寫入。Kafka 確保從同步副本列表中選舉一個副本為 leader,或者說 follower 追趕 leader 資料。leader 負責維護和跟蹤 ISR(In-Sync Replicas 的縮寫,表示副 本同步佇列,具體可參考下節)中所有 follower 滯後的狀態。當 producer 傳送一條訊息到 broker 後,leader 寫入訊息並複製到所有 follower。訊息提交之後才被成功複製到所有的同 步副本。訊息複製延遲受最慢的 follower 限制,重要的是快速檢測慢副本,如果 follower“落 後”太多或者失效,leader 將會把它從 ISR 中刪除。

4. ISR

    ISR(In-Sync Replicas),副本同步佇列。ISR 中包括 leader 和 follower。副本數對 Kafka 的吞吐率是有一定的影響,但極大的增強了可用性。預設情況下 Kafka 的 replica 數量為 1, 即每個 partition 都有一個唯一的 leader,為了確保訊息的可靠性,通常應用中將其值(由 broker 的引數 offsets.topic.replication.factor 指定)大小設定為大於 1,比如 3。 所有的副本(replicas) 統稱為 Assigned Replicas,即 AR。ISR 是 AR 中的一個子集,由 leader 維護 ISR 列表,follower 從 leader 同步資料有一些延遲(包括延遲時replica.lag.time.max.ms 和延遲條數 replica.lag.max.messages兩個維度, 當前的0.10.x及以上版本中只支援replica.lag.time.max.ms 這個維度),任意一個超過閾值都會把 follower 剔除出 ISR, 存入OSR(Outof-Sync Replicas) 列表,新加入的 follower 也會先存放在 OSR 中。

    AR=ISR+OSR

    Kafka 0.10.x 版 本 後 移 除 了 replica.lag.max.messages 參 數 , 只 保 留 了replica.lag.time.max.ms 作為 ISR 中副本管理的引數。為什麼這樣做呢?

    replica.lag.max.messages 表示當前某個副本落後 leaeder 的訊息數量超過了這個引數的 值,那麼 leader 就會把 follower 從 ISR 中刪除。假設設定 replica.lag.max.messages=4,那麼 如果 producer 一次傳送至 broker 的訊息數量都小於 4 條時,因為在 leader 接受到 producer 傳送的訊息之後而 follower 副本開始拉取這些訊息之前,follower 落後 leader 的訊息數不會 超過 4 條訊息,故此沒有 follower 移出 ISR,所以這時候 replica.lag.max.message 的設定似乎 是合理的。但是 producer 發起瞬時高峰流量,producer 一次傳送的訊息超過 4 條時,也就是 超過 replica.lag.max.messages,此時 follower 都會被認為是與 leader 副本不同步了,從而被踢出了 ISR。但實際上這些 follower 都是存活狀態的且沒有效能問題,那麼在之後追上 leader,並被重新加入了 ISR,於是就會出現它們不斷地剔出 ISR 然後重新迴歸 ISR,這無疑增加了無謂的效能損耗。而且這個引數是 broker 全域性的。設定太大了,影響真正“落後”follower 的移除;設定的太小了,導致 follower 的頻繁進出。無法給定一個合適的 replica.lag.max.messages 的值,故此,新版本的 Kafka 移除了這個引數。

    HW,HighWatermark 的縮寫,俗稱高水位,取一個 partition 對應的 ISR 中最小的 LEO 作為 HW,consumer 最多隻能消費到 HW 所在的位置。每個 replica 都有 HW,leader 和 follower 各自負責更新自己的 HW 的狀態。對於 leader 新寫入的訊息,consumer 不能立刻消費,leader 會等待該訊息被所有 ISR 中的 replicas 同步後更新 HW,此時訊息才能被 consumer 消費,這 樣就保證瞭如果 leader 所在的 broker 失效,該訊息仍然可以從新選舉的 leader 中獲取。對於 來自內部 broker 的讀取請求,沒有 HW 的限制。

    (一個 partition 的副本分為 Leader 和 Follower,Leader 和 Follower 都維護了 HW 和 LEO, 而 partition 的讀寫都在 Leader 完成,Leader 的 HW 是所有 ISR 列表裡副本中最小的那個的 LEO,當 Leader 新寫入訊息後,Leader 的 LEO 更新到加入新訊息後的位置,Leader 的 HW 仍在原位置,當所有的 Follower 都同步完成後,HW 更新到最新位置,此時最新寫入的訊息 才能被 Consumer 消費)

圖 2-15 詳細的說明了當 producer 生產訊息至 broker 後,ISR 以及 HW 和 LEO 的流轉過 程:

    由此可見,Kafka 的複製機制既不是完全的同步複製,也不是單純的非同步複製。事實上, 同步複製要求所有能工作的 follower 都複製完,這條訊息才會被 commit,這種複製方式極 大的影響了吞吐率。而非同步複製方式下,follower 非同步的從 leader 複製資料,資料只要被 leader 寫入 log 就被認為已經 commit,這種情況下如果 follower 都還沒有複製完,落後於 leader 時,突然 leader 宕機,則會丟失資料。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保 資料不丟失以及吞吐率。

    Kafka 的 ISR 的管理最終都會反饋到 Zookeeper 節點上。具體位置為: /brokers/topics/[topic]/partitions/[partition]/state。目前有兩個地方會對這個 Zookeeper 的節點 進行維護:

    Controller:Kafka 叢集中的其中一個 Broker 會被選舉為 Controller,主要負責 Partition 管理和副本狀態管理,也會執行類似於重分配 partition 之類的管理任務。在符合某些特定條 件下,Controller 下的 LeaderSelector 會選舉新的 leader,將 ISR 和新的 leader_epoch 及 controller_epoch 寫入 Zookeeper 的相關節點中。同時發起 LeaderAndIsrRequest 通知所有的 replicas。

    Leader:Leader 有單獨的執行緒定期檢測 ISR 中 Follower 是否脫離 ISR,如果發現 ISR 變 化,則會將新的 ISR 的資訊返回到 Zookeeper 的相關節點中。

5. 資料可靠性和永續性保證

    當 producer 向 leader 傳送資料時,可以通過 request.required.acks 引數來設定資料可靠性的級別:

    1(預設):producer 等待 broker 的 ack,partition 的 leader 落盤成功後返回 ack,如果在 follower 同步成功之前 leader 故障,那麼將會丟失資料;

    0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker 一接收到還沒有寫入磁碟就已經返回,當 broker 故障時有可能丟失資料;

    -1:producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功後才返回 ack, 資料一般不會丟失,延遲時間長但是可靠性最高。但是這樣也不能保證資料不丟失,比如當 ISR 中只有 leader 時(前面 ISR 那一節講到,ISR 中的成員由於某些情況會增加也會減少, 最少就只剩一個 leader),這樣就變成了 acks=1 的情況;

    如果要提高資料的可靠性,在設定 request.required.acks=-1 的同時,也要 min.insync.replicas 這個引數(可以在 broker 或者 topic 層面進行設定)的配合,這樣才能發揮 最大的功效。min.insync.replicas 這個引數設定 ISR 中的最小副本數是多少,預設值為 1,當 且僅當 request.required.acks 引數設定為-1 時,此引數才生效。如果 ISR 中的副本數少於 min.insync.replicas 配置的數量時,客戶端會返回異常:org.apache.kafka. common.errors. NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

    接下來對 ack=1 和-1 的兩種情況進行詳細分析:

(1) Request.required.acks = 1

    producer 傳送資料到 Leader,Leader 寫本地日誌成功,返回客戶端成功;此時 ISR 中的 副本還沒有來得及拉取該訊息,Leader 就宕機了,那麼此次傳送的訊息就會丟失 producer 傳送資料到 Leader,Leader 寫本地日誌成功,返回客戶端成功;此時 ISR 中的副本還沒有 來得及拉取該訊息,Leader 就宕機了,那麼此次傳送的訊息就會丟失。

(2) Request.required.acks = -1

    同步(Kafka 預設為同步,即 producer.type=sync)的傳送模式,replication.factor>=2 且 min.insync.replicas>=2 的情況下,不會丟失資料。

    有兩種典型情況。acks=-1 的情況下(如無特殊說明,以下 acks 都表示為引數 request.required.acks),資料傳送到 leader,ISR 的 Follower 全部完 1 成資料同步後,Leader 此時掛掉,那麼會選舉出新的 Leader,資料不會丟失。

    acks=-1 的情況下,資料傳送到 leader 後 ,部分 ISR 的副本同步,leader 此時掛掉。比 如 follower1 和 follower2 都有可能變成新的 leader, producer 端會得到返回異常,producer 端 會重新發送資料,資料可能會重複。

    當然圖 2-17 中所示,如果在 leader crash 的時候,follower2 還沒有同步到任何資料,而且 follower2 被選舉為新的 leader 的話,這樣訊息就不會重複。

    考慮圖 2-18(即 acks=-1,部分 ISR 副本同步)中的另一種情況,如果在 Leader 掛掉的 時候,follower1 同步了訊息 4,5,follower2 同步了訊息 4,與此同時 follower2 被選舉為 leader, 那麼此時 follower1 中的多出的訊息 5 該做如何處理呢?

    這裡就需要 HW 的協同配合了。如前所述,一個 partition 中的 ISR 列表中,leader 的 HW 是所有 ISR 列表裡副本中最小的那個的 LEO。類似於木桶原理,水位取決於最低那塊 短板。

    如圖 2-19,某個 topic 的某 partition 有三個副本,分別為 A、B、C。A 作為 leader 肯定 是 LEO 最高,B 緊隨其後,C 機器由於配置比較低,網路比較差,故而同步最慢。這個時 候 A 機器宕機,這時候如果 B 成為 leader,假如沒有 HW,在 A 重新恢復之後會做同步 (makeFollower)操作,在宕機時 log 檔案之後直接做追加操作,而假如 B 的 LEO 已經達到了 A 的 LEO,會產生資料不一致的情況,所以使用 HW 來避免這種情況。

A 在做同步操作的時候,先將 log 檔案截斷到之前自己的 HW 的位置,即 3,之後再從 B 中拉取訊息進行同步。

    如果失敗的 follower 恢復過來,它首先將自己的 log 檔案截斷到上次 checkpointed 時刻 的 HW 的位置,之後再從 leader 中同步訊息。leader 掛掉會重新選舉,新的 leader 會發送“指 令”讓其餘的 follower 截斷至自身的 HW 的位置然後再拉取新的訊息。

注意:當 ISR 中的個副本的 LEO 不一致時,如果此時 leader 掛掉,選舉新的 leader 時 並不是按照 LEO 的高低進行選舉,而是按照 ISR 中的順序選舉,新的 leader 會發送“指令” 讓其餘的 follower 截斷至自身的 HW 的位置然後再拉取新的訊息。