1. 程式人生 > >《KAFKA官方文件》入門指南

《KAFKA官方文件》入門指南

《KAFKA官方文件》入門指南

1.入門指南

1.1簡介

Apache的Kafka™是一個分散式流平臺(a distributed streaming platform)。這到底意味著什麼?

我們認為,一個流處理平臺應該具有三個關鍵能力:

  1. 它可以讓你釋出和訂閱記錄流。在這方面,它類似於一個訊息佇列或企業訊息系統。
  2. 它可以讓你持久化收到的記錄流,從而具有容錯能力。
  3. 它可以讓你處理收到的記錄流。

 

Kafka擅長哪些方面?

它被用於兩大類應用:

  1. 建立實時流資料管道從而能夠可靠地在系統或應用程式之間的共享資料
  2. 構建實時流應用程式,能夠變換或者對資料
  3. 進行相應的處理。

想要了解Kafka如何具有這些能力,讓我們從下往上深入探索Kafka的能力。

首先,明確幾個概念:

  • Kafka是執行在一個或多個伺服器的叢集(Cluster)上的。
  • Kafka叢集分類儲存的記錄流被稱為主題(Topics)。
  • 每個訊息記錄包含一個鍵,一個值和時間戳。

Kafka有四個核心API:

  • 生產者 API 允許應用程式釋出記錄流至一個或多個Kafka的話題(Topics)。
  • 消費者API允許應用程式訂閱一個或多個主題,並處理這些主題接收到的記錄流。
  • Streams API允許應用程式充當流處理器(
    stream processor,從一個或多個主題獲取輸入流,並生產一個輸出流至一個或多個的主題,能夠有效地變換輸入流為輸出流。
  • Connector API允許構建和執行可重用的生產者或消費者,能夠把 Kafka主題連線到現有的應用程式或資料系統。例如,一個連線到關係資料庫的聯結器(connector)可能會獲取每個表的變化。

 

Kafka的客戶端和伺服器之間的通訊是靠一個簡單的,高效能的,與語言無關的TCP協議完成的。這個協議有不同的版本,並保持向後相容舊版本(向前相容舊版本?)。Kafka不光提供了一個Java客戶端,還有許多語言版本的客戶端。

主題和日誌

讓我們先來了解Kafka的核心抽象概念記錄流 – 主題。

主題是一種分類或釋出的一系列記錄的名義上的名字。Kafka的主題始終是支援多使用者訂閱的; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入的資料。

對於每一個主題,Kafka叢集保持一個分割槽日誌檔案,看下圖:

每個分割槽是一個有序的,不可變的訊息序列,新的訊息不斷追加到這個有組織的有保證的日誌上。分割槽會給每個訊息記錄分配一個順序ID號 – 偏移量, 能夠唯一地標識該分割槽中的每個記錄。

Kafka叢集保留所有釋出的記錄,不管這個記錄有沒有被消費過,Kafka提供可配置的保留策略去刪除舊資料(還有一種策略根據分割槽大小刪除資料)。例如,如果將保留策略設定為兩天,在記錄公佈後兩天,它可用於消費,之後它將被丟棄以騰出空間。Kafka的效能跟儲存的資料量的大小無關, 所以將資料儲存很長一段時間是沒有問題的。

事實上,保留在每個消費者元資料中的最基礎的資料就是消費者正在處理的當前記錄的偏移量(offset)或位置(position)。這種偏移是由消費者控制:通常偏移會隨著消費者讀取記錄線性前進,但事實上,因為其位置是由消費者進行控制,消費者可以在任何它喜歡的位置讀取記錄。例如,消費者可以恢復到舊的偏移量對過去的資料再加工或者直接跳到最新的記錄,並消費從“現在”開始的新的記錄。

這些功能的結合意味著,實現Kafka的消費者的代價都是很小的,他們可以增加或者減少而不會對叢集或其他消費者有太大影響。例如,你可以使用我們的命令列工具去追隨任何主題,而且不會改變任何現有的消費者消費的記錄。

資料日誌的分割槽,一舉數得。首先,它們允許資料能夠擴充套件到更多的伺服器上去。每個單獨的分割槽的大小受到承載它的伺服器的限制,但一個話題可能有很多分割槽,以便它能夠支援海量的的資料。其次,更重要的意義是分割槽是進行並行處理的基礎單元。

分散式

日誌的分割槽會跨伺服器的分佈在Kafka叢集中,每個伺服器會共享分割槽進行資料請求的處理。每個分割槽可以配置一定數量的副本分割槽提供容錯能力。每個分割槽都有一個伺服器充當“leader”和零個或多個伺服器充當“followers”。 leader處理所有的讀取和寫入分割槽的請求,而followers被動的從領導者拷貝資料。如果leader失敗了,followers之一將自動成為新的領導者。每個伺服器可能充當一些分割槽的leader和其他分割槽的follower,這樣的負載就會在叢集內很好的均衡分配。

生產者

生產者釋出資料到他們所選擇的主題。生產者負責選擇把記錄分配到主題中的哪個分割槽。這可以使用輪詢演算法( round-robin)進行簡單地平衡負載,也可以根據一些更復雜的語義分割槽演算法(比如基於記錄一些鍵值)來完成。

消費者

消費者以消費群(consumer group 的名稱來標識自己,每個釋出到主題的訊息都會發送給訂閱了這個主題的消費群裡面的一個消費者的一個例項。消費者的例項可以在單獨的程序或單獨的機器上。

如果所有的消費者例項都屬於相同的消費群,那麼記錄將有效地被均衡到每個消費者例項。

如果所有的消費者例項有不同的消費群,那麼每個訊息將被廣播到所有的消費者程序。

兩個伺服器的Kafka叢集具有四個分割槽(P0-P3)和兩個消費群。A消費群有兩個消費者,B群有四個。

更常見的是,我們會發現主題有少量的消費群,每一個都是“邏輯上的訂閱者”。每組都是由很多消費者例項組成,從而實現可擴充套件性和容錯性。這只不過是釋出 – 訂閱模式的再現,區別是這裡的訂閱者是一組消費者而不是一個單一的程序的消費者。

Kafka消費群的實現方式是通過分割日誌的分割槽,分給每個Consumer例項,使每個例項在任何時間點的都可以“公平分享”獨佔的分割槽。維持消費群中的成員關係的這個過程是通過Kafka動態協議處理。如果新的例項加入該組,他將接管該組的其他成員的一些分割槽; 如果一個例項死亡,其分割槽將被分配到剩餘的例項。

Kafka只保證一個分割槽內的訊息有序,不能保證一個主題的不同分割槽之間的訊息有序。分割槽的訊息有序與依靠主鍵進行資料分割槽的能力相結合足以滿足大多數應用的要求。但是,如果你想要保證所有的訊息都絕對有序可以只為一個主題分配一個分割槽,雖然這將意味著每個消費群同時只能有一個消費程序在消費。

保證

Kafka提供了以下一些高級別的保證:

  • 由生產者傳送到一個特定的主題分割槽的訊息將被以他們被髮送的順序來追加。也就是說,如果一個訊息M1和訊息M2都來自同一個生產者,M1先發,那麼M1將有一個低於M2的偏移,會更早在日誌中出現。
  • 消費者看到的記錄排序就是記錄被儲存在日誌中的順序。
  • 對於副本因子N的主題,我們將承受最多N-1次伺服器故障切換而不會損失任何的已經儲存的記錄。

對這些保證的更多細節可以參考文件的設計部分。

Kafka作為訊息系統

如何將Kafka的流的概念和傳統的企業資訊系統作比較?

訊息處理模型歷來有兩種:佇列釋出-訂閱。在佇列模型中,一組消費者可以從伺服器讀取記錄,每個記錄都會被其中一個消費者處理; 在釋出-訂閱模式裡,記錄被廣播到所有的消費者。這兩種模式都具有一定的優點和弱點。佇列的優點是它可以讓你把資料分配到多個消費者去處理,它可以讓您擴充套件你的處理能力。不幸的是,佇列不支援多個訂閱者,一旦一個程序讀取了資料,這個資料就會消失。釋出-訂閱模式可以讓你廣播資料到多個程序,但是因為每一個訊息傳送到每個訂閱者,沒辦法對訂閱者處理能力進行擴充套件。

Kafka的消費群的推廣了這兩個概念。消費群可以像佇列一樣讓訊息被一組程序處理(消費群的成員),與釋出 – 訂閱模式一樣,Kafka可以讓你傳送廣播訊息到多個消費群。

Kafka的模型的優點是,每個主題都具有這兩個屬性,它可以擴充套件處理能力,也可以實現多個訂閱者,沒有必要二選一。

Kafka比傳統的訊息系統具有更強的訊息順序保證的能力。

傳統的訊息佇列的訊息在佇列中是有序的,多個消費者從佇列中消費訊息,伺服器按照儲存的順序派發訊息。然而,儘管伺服器是按照順序派發訊息,但是這些訊息記錄被非同步傳遞給消費者,消費者接收到的訊息也許已經是亂序的了。這實際上意味著訊息的排序在並行消費中都將丟失。訊息系統通常靠 “排他性消費”( exclusive consumer)來解決這個問題,只允許一個程序從佇列中消費,當然,這意味著沒有並行處理的能力。

Kafka做的更好。通過一個概念:並行性-分割槽-主題實現主題內的並行處理,Kafka是能夠通過一組消費者的程序同時提供排序保證和負載均衡。每個主題的分割槽指定給每個消費群中的一個消費者,使每個分割槽只由該組中的一個消費者所消費。通過這樣做,我們確保消費者是一個分割槽唯一的讀者,從而順序的消費資料。因為有許多的分割槽,所以負載還能夠均衡的分配到很多的消費者例項上去。但是請注意,一個消費群的消費者例項不能比分割槽數量多。

Kafka作為儲存系統

任何訊息佇列都能夠解耦訊息的生產和消費,還能夠有效地儲存正在傳送的訊息。Kafka與眾不同的是,它是一個非常好的儲存系統。

Kafka把訊息資料寫到磁碟和備份分割槽。Kafka允許生產者等待返回確認,直到副本複製和持久化全部完成才認為成功,否則則認為寫入伺服器失敗。

Kafka使用的磁碟結構很好擴充套件,Kafka將執行相同的策略不管你是有50 KB或50TB的持久化資料。

由於儲存的重要性,並允許客戶控制自己的讀取位置,你可以把Kafka認為是一種特殊用途的分散式檔案系統,致力於高效能,低延遲的有保障的日誌儲存,能夠備份和自我複製。

Kafka流處理

只是讀,寫,以及儲存資料流是不夠的,目的是能夠實時處理資料流。

在Kafka中,流處理器是從輸入的主題連續的獲取資料流,然後對輸入進行一系列的處理,並生產連續的資料流到輸出主題。

例如,零售應用程式可能需要輸入銷售和出貨量,根據輸入資料計算出重新訂購的數量和調整後的價格,然後輸出到主題。

這些簡單處理可以直接使用生產者和消費者的API做到。然而,對於更復雜的轉換Kafka提供了一個完全整合的流API。這允許應用程式把一些重要的計算過程從流中剝離或者加入流一起。

這種設施可幫助解決這類應用面臨的難題:處理雜亂的資料,改變程式碼去重新處理輸入,執行有狀態的計算等

流API建立在Kafka提供的核心基礎單元之上:它使用生產者和消費者的API進行輸入輸出,使用Kafka儲存有狀態的資料,並使用群組機制在一組流處理例項中實現容錯。

把功能組合起來

訊息的傳輸,儲存和流處理的組合看似不尋常卻是Kafka作為流處理平臺的關鍵。

像HDFS分散式檔案系統,允許儲存靜態檔案進行批量處理。像這樣的系統允許儲存和處理過去的歷史資料

傳統的企業訊息系統允許處理您訂閱後才抵達的訊息。這樣的系統只能處理將來到達的資料。

Kafka結合了這些功能,這種結合對Kafka作為流應用平臺以及資料流處理的管道至關重要。

通過整合儲存和低延遲訂閱,流處理應用可以把過去和未來的資料用相同的方式處理。這樣一個單獨的應用程式,不但可以處理歷史的,儲存的資料,當它到達最後一條記錄不會停止,繼續等待處理未來到達的資料。這是泛化了的的流處理的概念,包括了批處理應用以及訊息驅動的應用。

同樣,流資料處理的管道結合實時事件的訂閱使人們能夠用Kafka實現低延遲的管道; 可靠的儲存資料的能力使人們有可能使用它傳輸一些重要的必須保證可達的資料。可以與一個定期載入資料的線下系統整合,或者與一個因為維護長時間下線的系統整合。流處理的元件能夠保證轉換(處理)到達的資料。

有關Kafka提供的保證,API和功能的更多資訊,看其餘檔案

1.2使用案例

下面描述了一些使用Apache Kafka™的流行用例。更多的關於這些領域實踐的概述,參考這個部落格

訊息

Kafka能夠很好的替代傳統的訊息中介軟體。訊息中介軟體由於各種原因被使用(解耦資料的生產和消費,緩衝未處理的訊息等)。相較於大多數訊息處理系統,Kafka有更好的吞吐量,內建分割槽,副本複製和容錯性,使其成為大規模訊息處理應用的理想解決方案。

根據我們的經驗訊息的使用通常具有相對低的吞吐量,但可能需要端到端的低延遲,以及高可靠性的保證,這種低延遲和可靠性的保證恰恰是Kafka能夠提供的。

在這一領域Kafka是能夠和傳統的訊息系統相媲美的,例如ActiveMQ或 RabbitMQ

網站活動跟蹤

最初的用例是用Kafka重建一個使用者活動跟蹤管道使之作為一組實時釋出 – 訂閱的資料來源。這意味著網站活動(網頁瀏覽,搜尋,或其他可能的操作)被當作一組中心主題釋出,每種活動被當作一個主題。這些資料來源(feeds)可被一系列的應用訂閱,包括實時處理,實時監測,載入到Hadoop系統或離線資料倉庫系統進行離線處理和報告。

活動追蹤通常會產生巨大的資料量,因為每個使用者頁面的瀏覽都會產生很多的活動訊息。

測量

Kafka通常用於監測資料的處理。這涉及從分散式應用程式聚集統計資料,生產出集中的執行資料來源feeds(以便訂閱)。

日誌聚合

許多人用Kafka作為日誌聚合解決方案的替代品。日誌聚合通常從伺服器收集物理日誌檔案,並把它們放在一個集中的地方(檔案伺服器或HDFS)進行處理。Kafka抽象了檔案的詳細資訊,把日誌或事件資料的簡潔抽象作為訊息流傳輸。這為低時延的處理提供支援,而且更容易支援多個數據源和分散式的資料消費。相比集中式的日誌處理系統,Scribe or Flume,Kafka提供同樣良好的效能,而且因為副本備份提供了更強的可靠性保證和更低的端到端延遲。

流處理

Kafka的流資料管道在處理資料的時候包含多個階段,其中原始輸入資料從Kafka主題被消費然後彙總,加工,或轉化成新主題用於進一步的消費或後續處理。例如,用於推薦新聞文章的資料流處理管道可能從RSS源抓取文章內容,並將其釋出到“文章”主題; 進一步的處理可能是標準化或刪除重複資料,然後釋出處理過的文章內容到一個新的話題; 最後的處理階段可能會嘗試推薦這個內容給使用者。這樣的資料流處理管道基於各個主題建立了實時資料資料流程圖。從版本0.10.0.0開始,Apache Kafka加入了輕量級的但功能強大的流處理庫Kafka Streams ,Kafka Streams支援如上所述的資料處理。除了Kafka Streams,可以選擇的開源流處理工具包括 Apache Storm and Apache Samza.

Event Sourcing

Event sourcing 是一種應用程式設計風格,是按照時間順序記錄的狀態變化的序列。Kafka的非常強大的儲存日誌資料的能力使它成為構建這種應用程式的極好的後端選擇。

Commit Log

Kafka可以為分散式系統提供一種外部提交日誌(commit-log)服務。日誌有助於節點之間複製資料,並作為一種資料重新同步機制用來恢復故障節點的資料。Kafka的log compaction 功能有助於支援這種用法。Kafka在這種用法中類似於Apache BookKeeper 專案。

1.3快速開始

本教程假設你從零開始,沒有現成的Kafka或ZooKeeper資料。由於Kafka控制檯指令碼在Unix基礎的和Windows平臺上的不同,在Windows平臺上使用bin\windows\,而不是bin/,並修改指令碼擴充套件為.bat。

1步:下載程式碼

下載0.10.2.0釋放和un-tar它。

> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0

2步:啟動伺服器

Kafka使用ZooKeeper的,所以你需要先啟動ZooKeeper的伺服器,如果你還沒有,您可以使用Kafka包裝裡的方便指令碼來得到一個快速和汙染的單節點的ZooKeeper例項。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

現在啟動Kafka伺服器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3步:建立一個話題

讓我們建立一個名為“test”主題,只有一個分割槽,只有一個副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

現在我們可以看到,如果我們執行的列表主題命令話題:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

除了手動建立主題,你還可以配置你的代理伺服器(broker),當一個不存在的主題被髮布的時候它能自動建立相應的主題。

4步:傳送一些訊息

Kafka帶有一個命令列客戶端,獲取從檔案或來自標準輸入的輸入,並作為訊息傳送到Kafka叢集。預設情況下,每一行將被作為單獨的訊息傳送。

執行生產者指令碼,然後輸入一些資訊到控制檯傳送到伺服器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

5步:啟動消費者

Kafka也有一個命令列消費者,將收到的訊息輸出到標準輸出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你在不同的終端上執行上面的命令,那麼你現在應該能看到從生產者終端輸入的訊息會出現在消費者終端。

所有的命令列工具都有其他選項; 不帶引數執行命令將顯示更加詳細的使用資訊。

6步:設定多代理群集

到目前為止,我們已經運行了單個代理的伺服器,但是這沒有樂趣。對於Kafka,一個代理是隻有一個單節點的叢集,因此多代理叢集只是比開始多了一些代理例項外,沒有什麼太大的變化。但只是為了感受一下,我們的叢集擴充套件到三個節點(所有的節點還是在本地機器上)。

首先,我們為每個經紀人做一個配置檔案(在Windows上使用copy命令來代替):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

現在,編輯這些新檔案和設定以下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

該broker.id屬性是叢集中的每個節點的唯一和永久的名字。我們要重寫埠和日誌目錄,因為我們都在同一臺機器上執行這些代理,我們要防止經紀人在同一埠上註冊或覆蓋彼此的資料。

我們已經有Zookeeper服務和我們的單個節點服務,所以我們只需要啟動兩個新節點:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

現在,建立一個新的具有三個的副本因子的主題:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,現在我們有一個叢集,但是如何才能知道哪個代理節點在做什麼?要檢視執行“describe topics”命令:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

下面是輸出的解釋。第一行給出了所有分割槽的摘要,每個附加的行提供了一個分割槽的資訊。由於我們只有一個分割槽,所以這個主題只有一行。

  • “Leader”,負責指定分割槽所有讀取和寫入的節點。每個節點將是一部分隨機選擇的分割槽中的領導者。
  • “Replicas”是此分割槽日誌的節點列表集合,不管這些節點是否是領導者或者只是還活著(不在in-sync狀態)。
  • “ISR”是一組”in-sync” 節點列表的集合。這個列表包括目前活著並跟leader保持同步的replicas,Isr 是Replicas的子集。

請注意,在我的例子節點1是該主題的唯一分割槽中的leader。

我們可以執行相同的命令看看我們建立原來的話題的狀態:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

所以毫不奇怪,原來的話題沒有副本,只有我們建立它時的唯一的伺服器0。

讓我們釋出一些訊息到我們新的話題:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現在讓我們來消費這些訊息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現在,讓我們測試容錯性。代理1是領導者,讓我們殺死它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

領導權已經切換到備機中的一個節點上去了,節點1不再在同步中的副本集(in-sync replica set)中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0

但訊息仍然是可用於消費,即使是原來負責寫任務的領導者已經不在了:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

7步:使用Kafka連線匯入/匯出資料

從控制檯寫入資料和寫回控制檯是一個很方便入門的例子,但你可能想用Kafka使用其他來源的資料或匯出Kafka的資料到其他系統。相對於許多系統需要編寫定製整合的程式碼,您可以使用Kafka連線到系統去匯入或匯出資料。

Kafka Connect是包括在Kafka中一個工具,用來匯入匯出資料到Kafka。它是connectors的一個可擴充套件工具,其執行定製邏輯,用於與外部系統互動。在這個快速入門,我們將看到如何使用Kafka Connect做一些簡單的聯結器從一個檔案匯入資料到Kafka的主題,和將主題資料匯出到一個檔案。

首先,我們需要建立一些原始資料來開始測試:

> echo -e "foo\nbar" > test.txt

接下來,我們將啟動兩個執行在獨立模式的聯結器,這意味著他們在一個單一的,區域性的,專用的程序中執行。我們提供三個配置檔案作為引數。第一始終是Kafka連線過程中的公共配置,如要連線到的Kafka的代理伺服器的配置和資料的序列化格式的配置。剩餘的每個配置檔案用來建立指定的聯結器。這些檔案包括一個唯一的聯結器名稱,需要例項化的聯結器類,還有建立該聯結器所需的其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

用這些Kafka的示例配置檔案,使用前面已經啟動的本地群集的預設配置,建立兩個聯結器:第一是一個源聯結器,其從輸入檔案中讀取每行的內容,釋出到的Kafka主題和第二個是一個sink聯結器負責從Kafka主題讀取訊息,生產出的訊息按行輸出到檔案。

在啟動過程中,你會看到一些日誌資訊,包括一些表明該聯結器被例項化的資訊。一旦Kafka Connect程序已經開始,源聯結器應該開始從test.txt讀取每行的訊息,並將其生產釋出到主題connect-test,而sink聯結器應該從主題connect-test讀取訊息,並將其寫入檔案test.sink.txt。我們可以通過檢查輸出檔案的內容來驗證資料都已通過整個管道輸送:

> cat test.sink.txt
foo
bar

請注意,資料被儲存在Kafka主題的connect-test中,所以我們也可以執行控制檯消費者消費主題中的資料(或使用定製的消費者程式碼來處理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

聯結器不停的處理資料,因此我們可以將資料新增到該檔案,並能看到資料通過管道移動:

> echo "Another line" >> test.txt

您應該看到一行訊息出現在控制檯消費者的控制檯和sink檔案中。

8步:使用Kafka Streams處理資料

Kafka Streams 是Kafka的客戶端庫, 用來做實時流處理和分析儲存在Kafka代理伺服器的資料。該快速入門例子將演示如何執行這個流應用庫。這裡是要點WordCountDemo的示例程式碼(轉換為方便閱讀的Java 8 lambda表示式)。

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic ""streams-file-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count("Counts")

// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

它實現了單詞計數演算法,計算輸入文字中一個單詞的出現次數。然而,與其他單詞計數的演算法不同,其他的演算法一般都是對有界資料進行操作,該演算法演示應用程式的表現略有不同,因為他可以被設計去操作無限的,無界的流資料。和操作有界資料的演算法相似,它是一個有狀態的演算法,可以跟蹤和更新單詞的計數。然而,因為它必須承擔潛在的無界輸入資料的處理,它會週期性地輸出其當前狀態和結果,同時繼續處理更多的資料,因為它無法知道他有沒有處理完“所有”的輸入資料。

作為第一步驟,我們將準備好輸入到Kafka主題的資料,隨後由Kafka Streams應用程式進行處理。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

或在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

接下來,我們使用控制檯生產者把輸入的資料傳送到主題名streams-file-input 的主題上,其內容從STDIN一行一行的讀取,並一行一行的釋出到主題,每一行的訊息都有一個空鍵和編碼後的字串(在實踐中,當應用程式將啟動並執行後,流資料很可能會持續流入Kafka):

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

現在,我們可以執行單詞計數應用程式來處理輸入資料:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示應用程式將從輸入主題streams-file-input讀取資料,對讀取的訊息的執行單詞計數演算法,並且持續寫入其當前結果到輸出主題streams-wordcount-output。因此,除了寫回Kafka的日誌條目,不會有任何的STDOUT輸出。該演示將執行幾秒鐘,與典型的流處理應用不同,演示程式會自動終止。

現在,我們通過讀取輸出主題的輸出得到單詞計數演示程式的結果:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

下面的資料會被輸出到控制檯:

all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

這裡,第一列是java.lang.String型別的訊息健,而第二列是java.lang.Long型訊息值。注意,這裡的輸出其實是資料更新的連續流,每個資料記錄(上面的例子裡的每行的輸出)都有一個單詞更新後的數目值,例如“Kafka”作為鍵的記錄。對於具有相同鍵的多個記錄,每個後面的記錄都是前一個記錄的更新。

下面的兩個圖說明什麼發生在幕後的過程。第一列顯示的當前狀態的變化,用KTable<String, Long>來統計單詞出現的數目。第二列顯示KTable狀態更新導致的發生變化的記錄,這個變化的記錄被髮送到輸出Kafka主題streams-wordcount-output

首先, “all streams lead to kafka”這樣一行文字正在被處理。當新的單詞被處理的時候,KTable會增加一個新的表項(以綠色背景高亮顯示),並有相應的變化記錄傳送到下游KStream。

當第二行“hello kafka streams”被處理的時候,我們觀察到,現有的KTable中的表項第一次被更新(這裡: 單詞 “kafka” 和 “streams”)。再次,改變的記錄被髮送到輸出話題。

以此類推(我們跳過的第三行是如何被處理的插圖)。這就解釋了為什麼輸出主題有我們上面例子顯示的內容,因為它包含了完整的更改記錄。

跳出這個具體的例子我們從整體去看, Kafka流利用表和日誌變化(changelog)流之間的二元性(here: 表= the KTable, 日誌變化流 = the downstream KStream):你可以釋出的每一個表的變化去一個流,如果你從開始到結束消費了整個的日誌變化(changelog)流,你可以重建表的內容。

現在,你可以寫更多的輸入資訊到streams-file-input主題,並觀察更多的資訊加入到了 streams-wordcount-output主題,反映了更新後的單詞數目(例如,使用上述的控制檯生產者和控制檯消費者)。

您可以通過Ctrl-C 停止控制檯消費者。

1.4生態系統

除了Kafka的主要版本之外,還有很多應用集成了Kafka工具。該生態系統頁面中列出的許多工具,包括流處理系統,Hadoop的整合,監控和部署工具。

1.5從以前版本升級

0.8.40.9.x0.10.0.x0.10.1.x升級到0.10.2.0

0.10.2.0的有線協議有變化。通過下面的推薦滾動升級計劃,你能保證在升級過程中無需停機。但是,請在升級之前檢視0.10.2.0版本顯著的變化

從0.10.2版本開始,Java客戶端(生產者和消費者)已獲得與舊版本代理伺服器溝通的能力。版本0.10.2客戶可以跟0.10.0版或更新版本的代理溝通。但是,如果你的代理比0.10.0老,你必須在升級客戶端之前升級Kafka叢集中的所有代理伺服器(Broker)。版本0.10.2代理支援0.8.x和更新的客戶端。

對於滾動升級:

  1. 更新所有代理伺服器上的server.properties檔案,新增以下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在效能的影響瞭解此配置做什麼的詳細資訊。)
  2. 逐一升級代理:關閉代理,更新程式碼,並重新啟動。
  3. 一旦整個群集升級成功,通過編輯inter.broker.protocol.version將其設定為0.10.2的協議版本。
  4. 如果您以前的訊息格式為0.10.0,改變log.message.format.version至0.10.2(這是一個無效操作,因為0.10.0,0.10.1和0.10.2的訊息格式相同)。如果您以前的訊息格式版本低於0.10.0,不要改變log.message.format.version – 這個引數只能在所有的消費者都已經升級到0.10.0.0或更高版本之後改動。
  5. 逐一重新啟動代理伺服器使新協議版本生效。
  6. 如果這時log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然後更改每個代理伺服器的log.message.format.version到0.10.2,然後逐一重新啟動。

注意:如果你願意接受宕機,你可以簡單地把所有的代理伺服器關閉,更新程式碼,然後重新啟動他們。他們將預設使用新的協議。

注:改變協議版本並重新啟動可以在代理伺服器升級之後的任何時間做,沒有必要必須立刻就做。

升級0.10.1版本的Kafka流應用

  • 從0.10.1升級您的流應用程式到0.10.2不需要升級代理。0.10.2 Kafka流應用程式可以連線到0.10.2和0.10.1代理(但無法連線到 0.10.0的代理)。
  • 你需要重新編譯程式碼。只是替換Kafka流的jar檔案將無法正常工作,這破壞你的應用程式。
  • 如果您使用自定義(即使用者實現的)的時間戳提取,則需要更新此程式碼,因為TimestampExtractor介面改變了。
  • 如果您註冊了自定義指標,您將需要更新此程式碼,因為StreamsMetric介面被改變了。
  • 0.10.2 流 API的變化更多的細節。

0.10.2.1顯著的變化

  • 對於StreamsConfig類的兩個配置的預設值的修改提高了Kafka流應用的彈性。內部Kafka流生產者retries預設值從0變化到10,內部Kafka流消費者max.poll.interval.ms 預設值從300000到改變Integer.MAX_VALUE。

0.10.2.0顯著的變化

  • 在Java客戶端(生產者和消費者)已獲得與舊版本代理溝通的能力。版本0.10.2客戶端可以跟0.10.0版或更新版本的代理溝通。請注意,某些功能在跟就代理溝通的時候不可用或被限制了。
  • 在Java消費者中有幾種方法現在可能丟擲InterruptException如果呼叫執行緒被中斷。請參閱KafkaConsumer的Javadoc,對這種變化有一個更深入的解釋。
  • Java的消費者現在被恰當關閉。預設情況下,消費者會等待30秒才能完成掛起的請求。一個帶有timeout引數的新的API已新增到KafkaConsumer去控制最大等待時間。
  • 用逗號分隔的多個正則表示式可以傳遞多個Java消費者給MirrorMaker–whitelist選擇。這使得與MirrorMaker使用老Scala消費者時的行為一致。
  • 從0.10.1升級您的流應用程式0.10.2不需要代理伺服器升級。Kafka 0.10.2流應用程式可以連線到0.10.2和0.10.1代理(但無法連線到0.10.0代理)。
  • Zookeeper的依賴從流API中刪除。流API現在使用Kafka協議來管理內部主題,而不是直接修改動物園管理員的主題。這消除了需要直接訪問Zookeeper的特權,而“StreamsConfig.ZOOKEEPER_CONFIG”也不需要在流應用被設定。如果Kafka叢集是安全認證的,流應用程式必須具備必要的安全許可權才可以建立新的主題。
  • 一些新的引數,包括“security.protocol”, “connections.max.idle.ms”, “retry.backoff.ms”, “reconnect.backoff.ms”和“request.timeout.ms”新增到StreamsConfig類。如果使用者需要設定這些,要注意這些預設值。欲瞭解更多詳情,請參閱3.5Kafka流CONFIGS
  • 該offsets.topic.replication.factor代理的配置現在在主題生產中強制使用。直到叢集的大小符合這個複製因子要求,否則,主題的生產將失敗,返回GROUP_COORDINATOR_NOT_AVAILABLE錯誤。

新的協議版本

  • KIP-88:OffsetFetchRequest v2支援偏移檢索所有的主題,如果topics陣列設定為null。
  • KIP-88:OffsetFetchResponse V2引入了頂級error_code域。
  • KIP-103:UpdateMetadataRequest v3引入一個listener_name欄位到end_points陣列中的元素。
  • KIP-108:CreateTopicsRequest V1引入了一個validate_only引數。
  • KIP-108:CreateTopicsResponse V1引入了error_message到陣列topic_errors的元素。

0.8.40.9.x版本或0.10.0.X升級到0.10.1.0

0.10.1.0有線協議發生了變化。通過下面的推薦滾動升級計劃,能保證在升級過程中無需停機。但是,請注意在升級之前仔細閱讀0.10.1.0潛在的重大更改
注意:由於新協議的引入,它是升級你的客戶端之前請先完成Kafka叢集的升級(即0.10.1.x客戶端僅支援0.10.1.x或更高版本的代理,但0.10.1.x的代理可以支援舊版本客戶端)。

對於滾動升級:

  1. 更新所有代理上的server.properties檔案,並新增以下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在效能的影響對於此配置做什麼的詳細資訊。)
  2. 升級代理伺服器一次一個:關閉代理,更新程式碼,並重新啟動。
  3. 一旦整個群集升級完成,通過編輯inter.broker.protocol.version並將其設定為0.10.1.0的協議版本。
  4. 如果您以前的訊息格式為0.10.0,改變log.message.format.version至0.10.1(這是一個無效操作,如果0.10.0和0.10.1兩個協議的訊息格式相同)。如果您以前的訊息格式版本低於0.10.0,不要改變log.message.format.version — 這個引數只能在所有的消費者都已經升級到0.10.0.0或更高版本之後修改。
  5. 逐一重新啟動代理,新版本協議生效。
  6. 如果log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然後更改log.message.format.version到0.10.1,逐一重新啟動代理伺服器。

注意:如果你願意接受宕機,你可以簡單地把所有的代理伺服器關閉,更新程式碼,然後重新啟動他們。他們將預設使用新的協議。

注:改變協議版本並重新啟動可以在代理伺服器升級之後的任何時間做,沒有必要必須立刻就做。

0.10.1.0的重大更改

  • 日誌保留時間不再基於日誌段的最後修改時間。相反,它會基於日誌段裡擁有最大的時間戳的訊息。
  • 日誌滾動時間不再取決於日誌段建立時間。相反,它現在是基於訊息的時間戳。進一步來說,如果日誌段中第一個訊息的時間戳是T,當一個新的訊息具有的時間戳大於或等於T + log.roll.m,該日誌將被覆蓋。
  • 0.10.0的開啟檔案的處理程式將增加〜33%,因為每個日誌段增加的時間索引檔案。
  • 時間索引和偏移索引共享相同的索引大小的配置。因為時間索引條目大小是1.5倍偏移索引條目的大小。使用者可能需要增加log.index.size.max.bytes以避免潛在的頻繁的日誌滾動。
  • 由於增加的索引檔案,在某些代理伺服器上具有大量的日誌段(例如> 15K),代理啟動期間日誌載入過程可能很長。根據我們的實驗,num.recovery.threads.per.data.dir設定為1可減少日誌裝載時間。

升級0.10.0Kafka流應用

  • 從0.10.0升級您的流應用程式到0.10.1確實需要一個代理的升級,因為Kafka 0.10.1的流應用程式只能連線到0.10.1代理。
  • 有幾個API的變化不向後相容(參見流API在0.10.1的變化有詳細介紹)。因此,你需要更新和重新編譯程式碼。只是交換了Kafka流庫的jar檔案將無法正常工作,並會破壞你的應用程式。

0.10.1.0顯著的變化

  • 新的Java消費者不是beta版了,我們推薦它做新的應用開發。老Scala消費者仍然支援,但他們會在未來的版本中將會棄用,並將在未來的主版本中刪除。
  • 在使用像MirrorMaker和控制檯消費者新建消費者的過程中–new-consumer/ –new.consumer開關不再被需要; 一個簡單地使用是通過一個Kafka代理去連線,而不是Zookeeper的合集。此外,控制檯消費者去連線舊版本的消費者已被棄用,並將在未來的主版本中刪除。
  • Kafka叢集現在可以通過一個叢集ID被唯一標識。其會在一個代理升級到0.10.1.0時自動生成。叢集ID經由kafka.server可用:type= KafkaServer,name= ClusterId metric ,它是所述元資料響應的一部分。序列器,客戶端攔截器和度量報告可以通過實現ClusterResourceListener介面接收叢集ID。
  • BrokerState “RunningAsController”(值4)已被刪除。由於一個bug,代理在轉換狀態之前只會簡單的這種狀態下,因此去除的影響應該很小。一種推薦的檢測方法是一個給定的代理的控制器是由kafka.controller實現:type=KafkaController,name=ActiveControllerCount metric。
  • 新的Java消費者現在可以允許使用者通過時間戳在分割槽上搜索偏移量(offset)。
  • 新的Java消費者現在可以從後臺執行緒支援心跳檢查。有一個新的配置 max.poll.interval.ms,它控制消費者會主動離開組(5分鐘預設情況下)之前輪詢呼叫的最大時間。配置的值 request.timeout.ms必須始終大於max.poll.interval.ms因為這是一個JoinGroup請求可以在伺服器上被阻止到消費者被負載均衡之前的最長時間.所以我們可以改變預設值為剛好超過5分鐘。最後,預設值session.timeout.ms已調整到10秒,預設值max.poll.records已更改為500。
  • 當授權者和使用者沒有說明某個主題的授權,代理將不再返回TOPIC_AUTHORIZATION_FAILED給請求,因為這會洩漏主題名稱。相反,UNKNOWN_TOPIC_OR_PARTITION錯誤程式碼將被返回。使用Kafka生產者和消費者通常會在收到未知的主題錯誤時自動重試,這可能會導致意外的超時或延遲。如果你懷疑這種情況發生了,你可以檢視客戶端的log去檢查。
  • 獲取返回有預設的大小限制(消費者50 MB和副本的複製10 MB)。現有的每個分割槽的限制也適用(消費者和副本複製為1 MB)。請注意,這些限制都不是絕對最大值,在下一個要點有解釋。
  • 消費者和副本可以繼續進行,如果發現一個訊息大於返回/分割槽大小的限制。更具體地,如果在非空的分割槽上提取的第一個訊息比任一個或兩個限值大,仍然會被返回。
  • 過載的建構函式加入到kafka.api.FetchRequest和kafka.javaapi.FetchRequest允許呼叫者指定分割槽順序(因為順序在V3是很重要的)。先前存在的建構函式被棄用,在傳送請求以避免飢餓問題之前,分割槽會被洗牌。

新的協議版本

  • ListOffsetRequest V1支援精確的基於時間戳的偏移搜尋。
  • MetadataResponse V2引入了一個新的引數: “CLUSTER_ID”。
  • FetchRequest v3支援限制請求返回的大小(除了現有的每個分割槽的限制),它能夠返回比限制更大的訊息和在請求中加入分割槽的順序具有重要意義。
  • JoinGroup V1引入了一個新的欄位: “rebalance_timeout”。

升級0.8.40.9.x版本到0.10.0.0

0.10.0.0具有的潛在的重大更改(請在升級前仔細檢查更改)和 在升級後的效能影響。通過下面的推薦滾動升級計劃,能保證不宕機,不影響效能和隨後的升級。
注意:由於新協議的引入,升級客戶端之前升級您的Kafka叢集是很重要的。

注意0.9.0.0版本的客戶端:由於0.9.0.0引入了一個錯誤,即依賴於ZooKeeper的客戶(老Scala高層次消費者和與老消費者一起使用的MirrorMaker)不能和0.10.0.x代理一起工作。因此,代理都升級到0.10.0.x之前, 0.9.0.0客戶端應升級到0.9.0.1 . 這一步對0.8.4或0.9.0.1客戶端沒有必要。

對於滾動升級:

  1. 更新所有代理伺服器的server.properties檔案,並新增以下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在效能的影響對於此配置做什麼的詳細資訊。)
  2. 升級代理。這可以通過簡單地將其關機,更新程式碼,並重新啟動實現。
  3. 一旦整個群集升級結束,通過編輯inter.broker.protocol.version並將其設定為0.10.0.0的協議版本。注意:您不應該修改log.message.format.version — 這個引數只能在所有的消費者都已經升級到0.10.0.0之後再修改。
  4. 逐一重新啟動代理,新協議版本生效。
  5. 一旦所有的消費者都已經升級到0.10.0,逐一修改log.message.format.version至0.10.0和重啟代理伺服器。

注意:如果你願意接受宕機,你可以簡單地把所有的代理伺服器關閉,更新程式碼,然後重新啟動他們。他們將預設使用新的協議。

注:改變協議版本並重新啟動可以在代理伺服器升級之後的任何時間做,沒有必要必須立刻就做。

升級到0.10.0.0帶來的潛在的效能影響

0.10.0訊息格式包括一個新的時間戳欄位,並對壓縮的訊息使用相對偏移。磁碟上的訊息格式可以通過在server.properties檔案的log.message.format.version進行配置。預設的磁碟上的訊息格式為0.10.0。如果消費者客戶端的版本是0.10.0.0之前的版本,那它只能明白0.10.0之前的訊息格式。在這種情況下,代理能夠把訊息從0.10.0格式轉換到一個較早的格式再發送舊版本的響應給消費者。然而,代理不能在這種情況下使用零拷貝轉移。Kafka社群報告顯示效能的影響為CPU利用率從20%增加至將近100%,這迫使所有客戶端的必須即時升級使效能恢復正常。為了避免這樣的訊息轉換帶來的效能問題,消費者升級到0.10.0.0之前,在升級代理到0.10.0.0的過程中設定log.message.format.version到0.8.2或0.9.0。這樣一來,代理仍然可以使用零拷貝傳輸,將資料傳送到老消費者。一旦消費者升級完成,訊息格式更改為0.10.0,這樣代理就可以享受新的訊息格式包括新的時間戳和改進的壓縮演算法。這種轉換可以支援相容性,對只有幾個還沒有更新到最新客戶端的應用程式非常有用,但不切實際的是使用一個過度使用的叢集中去支援所有消費者的流量。因此,當代理已經升級,但大多數客戶端還沒有完成升級的情況,要儘可能避免使用這種資訊轉換。

對於升級到0.10.0.0客戶,沒有效能影響。

注:設定訊息格式版本是一個證明,現有的所有支援的訊息都在這個版本或低於該訊息格式的版本。否則, 0.10.0.0之前的消費者可能不能正常工作。特別是訊息格式設定為0.10.0之後,不應該再改回先前的格式,因為它可能使得0.10.0.0之前的消費者工作異常。

注:由於每個訊息中引入了另外的時間戳,生產者傳送的訊息大小比較小的時候因為額外的負載開銷也許會看到吞吐量的下降。同樣,副本的複製會讓每個訊息額外傳輸8個位元組。如果你正在執行接近叢集承載能力的網路容量,你可能會壓垮網絡卡,由於超載而發生故障和效能問題。

注:如果您已對生產者啟用壓縮演算法,您可能會注意到降低的生產者吞吐量和/或在某些情況下代理降低的壓縮比。當接收到壓縮的訊息,0.10.0代理避免再次壓縮訊息,其通常降低了等待時間,並提高了吞吐量。在某些情況下,這可能會減少生產者批量訊息包的大小,這可能導致更糟糕的吞吐量。如果發生這種情況,使用者可以調整生產者的linger.ms和batch.size以獲得更好的吞吐量。此外,用於高效壓縮訊息的生產者緩衝區比代理使用的緩衝區小,這可能對磁碟的壓縮訊息比率有負面的影響。我們打算在未來的Kafka版本中能夠配置這些引數。

0.10.0.0潛在的重大更改

  • 從Kafka0.10.0.0開始,Kafka訊息格式的版本被表示為Kafka版本。例如,訊息格式0.9.0指通過Kafka0.9.0支援的最高訊息版本。
  • 訊息格式0.10.0已經推出,它是預設使用的版本。它引入了一個時間戳欄位和相對偏移被用於壓縮訊息。
  • ProduceRequest /Response V2已經被引入,它在預設