1. 程式人生 > >Kafka的架構原理,你真的理解嗎?

Kafka的架構原理,你真的理解嗎?

Apache Kafka 最早是由 LinkedIn 開源出來的分散式訊息系統,現在是 Apache 旗下的一個子專案,並且已經成為開源領域應用最廣泛的訊息系統之一。

 

Kafka 社群非常活躍,從 0.9 版本開始,Kafka 的標語已經從“一個高吞吐量,分散式的訊息系統”改為"一個分散式流平臺"。

 

Kafka 和傳統的訊息系統不同在於:

  • Kafka是一個分散式系統,易於向外擴充套件。

  • 它同時為釋出和訂閱提供高吞吐量。

  • 它支援多訂閱者,當失敗時能自動平衡消費者。

  • 訊息的持久化。

 

Kafka 和其他訊息佇列的對比:

入門例項

 

生產者

 

程式碼如下:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class UserKafkaProducer extends Thread
{
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();
    public UserKafkaProducer(String topic)
    {
        props.put("metadata.broker.list", "localhost:9092");
        props.put("bootstrap.servers", "master2:6667");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(props);
        this.topic = topic;
    }
@Override
    public void run() {
        int messageNo = 1;
        while (true)
        {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
            //返回的是Future<RecordMetadata>,非同步傳送
            producer.send(new ProducerRecord<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

消費者

 

程式碼如下:

Properties props = new Properties();
/* 定義kakfa 服務的地址,不需要將所有broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自動確認offset */
props.put("enable.auto.commit", "true");
/* 自動確認offset的時間間隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化類 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化類 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 /* 定義consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 消費者訂閱的topic, 可同時訂閱多個 */
consumer.subscribe(Arrays.asList("foo", "bar"));
 /* 讀取資料,讀取超時時間為100ms */
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

 

Kafka 架構原理

 

對於 Kafka 的架構原理,我們先提出如下幾個問題:

  • Kafka 的 topic 和分割槽內部是如何儲存的,有什麼特點?

  • 與傳統的訊息系統相比,Kafka 的消費模型有什麼優點?

  • Kafka 如何實現分散式的資料儲存與資料讀取?

 

Kafka 架構圖

 

Kafka 名詞解釋

 

在一套 Kafka 架構中有多個 Producer,多個 Broker,多個 Consumer,每個 Producer 可以對應多個 Topic,每個 Consumer 只能對應一個 Consumer Group。

 

整個 Kafka 架構對應一個 ZK 叢集,通過 ZK 管理叢集配置,選舉 Leader,以及在 Consumer Group 發生變化時進行 Rebalance。

Topic 和 Partition

 

在 Kafka 中的每一條訊息都有一個 Topic。一般來說在我們應用中產生不同型別的資料,都可以設定不同的主題。

 

一個主題一般會有多個訊息的訂閱者,當生產者釋出訊息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新訊息。

 

Kafka 為每個主題維護了分散式的分割槽(Partition)日誌檔案,每個 Partition 在 Kafka 儲存層面是 Append Log。

 

任何釋出到此 Partition 的訊息都會被追加到 Log 檔案的尾部,在分割槽中的每條訊息都會按照時間順序分配到一個單調遞增的順序編號,也就是我們的 Offset。Offset 是一個 Long 型的數字。

 

我們通過這個 Offset 可以確定一條在該 Partition 下的唯一訊息。在 Partition 下面是保證了有序性,但是在 Topic 下面沒有保證有序性。

在上圖中我們的生產者會決定傳送到哪個 Partition:

  • 如果沒有 Key 值則進行輪詢傳送。

  • 如果有 Key 值,對 Key 值進行 Hash,然後對分割槽數量取餘,保證了同一個 Key 值的會被路由到同一個分割槽;如果想佇列的強順序一致性,可以讓所有的訊息都設定為同一個 Key。

 

消費模型

 

訊息由生產者傳送到 Kafka 集群后,會被消費者消費。一般來說我們的消費模型有兩種:

  • 推送模型(Push)

  • 拉取模型(Pull)

 

基於推送模型的訊息系統,由訊息代理記錄消費狀態。訊息代理將訊息推送到消費者後,標記這條訊息為已經被消費,但是這種方式無法很好地保證消費的處理語義。

 

比如當我們已經把訊息傳送給消費者之後,由於消費程序掛掉或者由於網路原因沒有收到這條訊息,如果我們在消費代理將其標記為已消費,這個訊息就永久丟失了。

 

如果我們利用生產者收到訊息後回覆這種方法,訊息代理需要記錄消費狀態,這種不可取。

 

如果採用 Push,訊息消費的速率就完全由消費代理控制,一旦消費者發生阻塞,就會出現問題。

 

Kafka 採取拉取模型(Poll),由自己控制消費速度,以及消費的進度,消費者可以按照任意的偏移量進行消費。

 

比如消費者可以消費已經消費過的訊息進行重新處理,或者消費最近的訊息等等。

 

網路模型

 

Kafka Client:單執行緒 Selector

 

單執行緒模式適用於併發連結數小,邏輯簡單,資料量小的情況。在 Kafka 中,Consumer 和 Producer 都是使用的上面的單執行緒模式。

 

這種模式不適合 Kafka 的服務端,在服務端中請求處理過程比較複雜,會造成執行緒阻塞,一旦出現後續請求就會無法處理,會造成大量請求超時,引起雪崩。而在伺服器中應該充分利用多執行緒來處理執行邏輯。

 

Kafka Server:多執行緒 Selector

 

在 Kafka 服務端採用的是多執行緒的 Selector 模型,Acceptor 執行在一個單獨的執行緒中,對於讀取操作的執行緒池中的執行緒都會在 Selector 註冊 Read 事件,負責服務端讀取請求的邏輯。

 

成功讀取後,將請求放入 Message Queue共享佇列中。然後在寫執行緒池中,取出這個請求,對其進行邏輯處理。

 

這樣,即使某個請求執行緒阻塞了,還有後續的執行緒從訊息佇列中獲取請求並進行處理,在寫執行緒中處理完邏輯處理,由於註冊了 OP_WIRTE 事件,所以還需要對其傳送響應。

 

高可靠分散式儲存模型

 

在 Kafka 中保證高可靠模型依靠的是副本機制,有了副本機制之後,就算機器宕機也不會發生資料丟失。

 

高效能的日誌儲存

 

Kafka 一個 Topic 下面的所有訊息都是以 Partition 的方式分散式的儲存在多個節點上。

 

同時在 Kafka 的機器上,每個 Partition 其實都會對應一個日誌目錄,在目錄下面會對應多個日誌分段(LogSegment)。

 

LogSegment 檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為 Segment 索引檔案和資料檔案。

 

這兩個檔案的命令規則為:Partition 全域性的第一個 Segment 從 0 開始,後續每個 Segment 檔名為上一個 Segment 檔案最後一條訊息的 Offset 值,數值大小為 64 位,20 位數字字元長度,沒有數字用 0 填充。

 

如下,假設有 1000 條訊息,每個 LogSegment 大小為 100,下面展現了 900-1000 的索引和 Log:

由於 Kafka 訊息資料太大,如果全部建立索引,既佔了空間又增加了耗時,所以 Kafka 選擇了稀疏索引的方式,這樣索引可以直接進入記憶體,加快偏查詢速度。

 

簡單介紹一下如何讀取資料,如果我們要讀取第 911 條資料首先第一步,找到它是屬於哪一段的。

 

根據二分法查詢到它屬於的檔案,找到 0000900.index 和 00000900.log 之後,然後去 index 中去查詢 (911-900) = 11 這個索引或者小於 11 最近的索引。

 

在這裡通過二分法我們找到了索引是 [10,1367],然後我們通過這條索引的物理位置 1367,開始往後找,直到找到 911 條資料。

 

上面講的是如果要找某個 Offset 的流程,但是我們大多數時候並不需要查詢某個 Offset,只需要按照順序讀即可。

 

而在順序讀中,作業系統會在記憶體和磁碟之間新增 Page Cache,也就是我們平常見到的預讀操作,所以我們的順序讀操作時速度很快。

 

但是 Kafka 有個問題,如果分割槽過多,那麼日誌分段也會很多,寫的時候由於是批量寫,其實就會變成隨機寫了,隨機 I/O 這個時候對效能影響很大。所以一般來說 Kafka 不能有太多的 Partition。

 

針對這一點,RocketMQ 把所有的日誌都寫在一個檔案裡面,就能變成順序寫,通過一定優化,讀也能接近於順序讀。

 

大家可以思考一下:

  • 為什麼需要分割槽,也就是說主題只有一個分割槽,難道不行嗎?

  • 日誌為什麼需要分段?

 

副本機制

 

Kafka 的副本機制是多個服務端節點對其他節點的主題分割槽的日誌進行復制。

 

當叢集中的某個節點出現故障,訪問故障節點的請求會被轉移到其他正常節點(這一過程通常叫 Reblance)。

 

Kafka 每個主題的每個分割槽都有一個主副本以及 0 個或者多個副本,副本保持和主副本的資料同步,當主副本出故障時就會被替代。

在 Kafka 中並不是所有的副本都能被拿來替代主副本,所以在 Kafka 的 Leader 節點中維護著一個 ISR(In Sync Replicas)集合。

 

翻譯過來也叫正在同步中集合,在這個集合中的需要滿足兩個條件:

  • 節點必須和 ZK 保持連線。

  • 在同步的過程中這個副本不能落後主副本太多。

 

另外還有個 AR(Assigned Replicas)用來標識副本的全集,OSR 用來表示由於落後被剔除的副本集合。

 

所以公式如下:ISR = Leader + 沒有落後太多的副本;AR = OSR+ ISR。

 

這裡先要說下兩個名詞:HW(高水位)是 Consumer 能夠看到的此 Partition 的位置,LEO 是每個 Partition 的 Log 最後一條 Message 的位置。

 

HW 能保證 Leader 所在的 Broker 失效,該訊息仍然可以從新選舉的 Leader 中獲取,不會造成訊息丟失。

 

當 Producer 向 Leader 傳送資料時,可以通過 request.required.acks 引數來設定資料可靠性的級別:

  • 1(預設):這意味著 Producer 在 ISR 中的 Leader 已成功收到的資料並得到確認後傳送下一條 Message。如果 Leader 宕機了,則會丟失資料。

  • 0:這意味著 Producer 無需等待來自 Broker 的確認而繼續傳送下一批訊息。這種情況下資料傳輸效率最高,但是資料可靠性卻是最低的。

  • -1:Producer 需要等待 ISR 中的所有 Follower 都確認接收到資料後才算一次傳送完成,可靠性最高。

    但是這樣也不能保證資料不丟失,比如當 ISR 中只有 Leader 時(其他節點都和 ZK 斷開連線,或者都沒追上),這樣就變成了 acks = 1 的情況。

 

高可用模型及冪等

 

 在分散式系統中一般有三種處理語義:

 

at-least-once

 

至少一次,有可能會有多次。如果 Producer 收到來自 Ack 的確認,則表示該訊息已經寫入到 Kafka 了,此時剛好是一次,也就是我們後面的 Exactly-once。

 

但是如果 Producer 超時或收到錯誤,並且 request.required.acks 配置的不是 -1,則會重試傳送訊息,客戶端會認為該訊息未寫入 Kafka。

 

如果 Broker 在傳送 Ack 之前失敗,但在訊息成功寫入 Kafka 之後,這一次重試將會導致我們的訊息會被寫入兩次。

 

所以訊息就不止一次地傳遞給最終 Consumer,如果 Consumer 處理邏輯沒有保證冪等的話就會得到不正確的結果。

 

在這種語義中會出現亂序,也就是當第一次 Ack 失敗準備重試的時候,但是第二訊息已經發送過去了,這個時候會出現單分割槽中亂序的現象。

 

我們需要設定 Prouducer 的引數 max.in.flight.requests.per.connection,flight.requests 是 Producer 端用來儲存傳送請求且沒有響應的佇列,保證 Produce r端未響應的請求個數為 1。

 

at-most-once

 

如果在 Ack 超時或返回錯誤時 Producer 不重試,也就是我們講 request.required.acks = -1,則該訊息可能最終沒有寫入 Kafka,所以 Consumer 不會接收訊息。

 

exactly-once

 

剛好一次,即使 Producer 重試傳送訊息,訊息也會保證最多一次地傳遞給 Consumer。該語義是最理想的,也是最難實現的。

 

在 0.10 之前並不能保證 exactly-once,需要使用 Consumer 自帶的冪等性保證。0.11.0 使用事務保證了。

 

如何實現 exactly-once

 

要實現 exactly-once 在 Kafka 0.11.0 中有兩個官方策略:

 

單 Producer 單 Topic

 

每個 Producer 在初始化的時候都會被分配一個唯一的 PID,對於每個唯一的 PID,Producer 向指定的 Topic 中某個特定的 Partition 傳送的訊息都會攜帶一個從 0 單調遞增的 Sequence Number。

 

在我們的 Broker 端也會維護一個維度為,每次提交一次訊息的時候都會對齊進行校驗:

  • 如果訊息序號比 Broker 維護的序號大一以上,說明中間有資料尚未寫入,也即亂序,此時 Broker 拒絕該訊息,Producer 丟擲 InvalidSequenceNumber。

  • 如果訊息序號小於等於 Broker 維護的序號,說明該訊息已被儲存,即為重複訊息,Broker 直接丟棄該訊息,Producer 丟擲 DuplicateSequenceNumber。

  • 如果訊息序號剛好大一,就證明是合法的。

 

上面所說的解決了兩個問題:

  • 當 Prouducer 傳送了一條訊息之後失敗,Broker 並沒有儲存,但是第二條訊息卻傳送成功,造成了資料的亂序。

  • 當 Producer 傳送了一條訊息之後,Broker 儲存成功,Ack 回傳失敗,Producer 再次投遞重複的訊息。

 

上面所說的都是在同一個 PID 下面,意味著必須保證在單個 Producer 中的同一個 Seesion 內,如果 Producer 掛了,被分配了新的 PID,這樣就無法保證了,所以 Kafka 中又有事務機制去保證。

 

事務

 

在 Kafka 中事務的作用是:

  • 實現 exactly-once 語義。

  • 保證操作的原子性,要麼全部成功,要麼全部失敗。

  • 有狀態的操作的恢復。

 

事務可以保證就算跨多個,在本次事務中的對消費佇列的操作都當成原子性,要麼全部成功,要麼全部失敗。

 

並且,有狀態的應用也可以保證重啟後從斷點處繼續處理,也即事務恢復。

 

在 Kafka 的事務中,應用程式必須提供一個唯一的事務 ID,即 Transaction ID,並且宕機重啟之後,也不會發生改變。

 

Transactin ID 與 PID 可能一一對應,區別在於 Transaction ID 由使用者提供,而 PID 是內部的實現對使用者透明。

 

為了 Producer 重啟之後,舊的 Producer 具有相同的 Transaction ID 失效,每次 Producer 通過 Transaction ID 拿到 PID 的同時,還會獲取一個單調遞增的 Epoch。

 

由於舊的 Producer 的 Epoch 比新 Producer 的 Epoch 小,Kafka 可以很容易識別出該 Producer 是老的,Producer 並拒絕其請求。

 

為了實現這一點,Kafka 0.11.0.0 引入了一個伺服器端的模組,名為 Transaction Coordinator,用於管理 Producer 傳送的訊息的事務性。

 

該 Transaction Coordinator 維護 Transaction Log,該 Log 存於一個內部的 Topic 內。

 

由於 Topic 資料具有永續性,因此事務的狀態也具有永續性。Producer 並不直接讀寫 Transaction Log,它與 Transaction Coordinator 通訊,然後由 Transaction Coordinator 將該事務的狀態插入相應的 Transaction Log。

 

Transaction Log 的設計與 Offset Log 用於儲存 Consumer 的 Offset 類似。

 

最後

 

關於訊息佇列或者 Kafka 的一些常見的面試題,通過上面的文章可以提煉出以下幾個比較經典的問題,大部分問題都可以從上面總結後找到答案:

  • 為什麼使用訊息佇列?訊息佇列的作用是什麼?

  • Kafka 的 Topic 和分割槽內部是如何儲存的,有什麼特點?

  • 與傳統的訊息系統相比,Kafka 的消費模型有什麼優點?

  • Kafka 如何實現分散式的資料儲存與資料讀取?

  • Kafka 為什麼比 RocketMQ 支援的單機 Partition 要少?

  • 為什麼需要分割槽,也就是說主題只有一個分割槽,難道不行嗎?

  • 日誌為什麼需要分段?

  • Kafka 是依靠什麼機制保持高可靠,高可用?

  • 訊息佇列如何保證訊息冪等?

  • 讓你自己設計個訊息佇列,你會怎麼設計,會考慮哪些方面?

  • 歡迎工作一到五年的Java工程師朋友們加入Java程式設計師開發: 854393687
    群內提供免費的Java架構學習資料(裡面有高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!