1. 程式人生 > >《KAFKA官方文件》第三章:快速入門

《KAFKA官方文件》第三章:快速入門

快速入門

本教程假設讀者完全從零開始,電腦上沒有已經存在的Kafka和Zookeeper環境。以下內容需要注意的是:因為在類Unix平臺和Windows平臺上的Kafka控制指令碼不同,在Windows平臺上,需要使用路徑\bin\windows代替/bin,指令碼副檔名改為.bat

第一步:下載kafka

>tar -xzf kafka_2.11-0.10.2.0.tgz

>cd kafka_2.11-0.10.2.0

第二步:啟動kafka服務端

Kafka中使用了Zookeeper,所以我們需要先啟動一個Zookeeper服務端。我們可以使用kafka中已經打包好的指令碼方便的完成這個操作,快遞啟動一個單節點的Zookeeper例項。

>bin/zookeeper-server-start.sh config/zookeeper.properties

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

然後啟動kafka服務端:

>bin/kafka-server-start.sh config/server.properties

[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

第三步:建立主題

現在我們建立一個單一分割槽(partition)並且只有單一複製(replica)的主題,名字叫test

>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

我們可以使用如下命令列出主題列表:

>bin/kafka-topics.sh –list –zookeeper localhost:2181

test

當然,我們也可以通過在服務端(broker)配置自動建立主題的選項,這樣當有訊息傳送到一個不存在的主題時系統會自動建立它。

第四步:傳送訊息

Kafka自帶了一個命令列工具,它可以從一個檔案或標準輸入流傳送訊息到Kafka叢集。預設情況下,每一行內容將被當做一個單獨的訊息。 執行以下生產者指令碼,然後通過控制檯輸入一些字元,即可作為訊息傳送到服務端:

>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

This is a message

This is another message

第五步:啟動消費者

Kafka同樣也提供了一個消費者指令碼,它可以消費掉訊息並輸出到命令列標準輸出流(STDOUT):

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

This is a message

This is another message

如果我們在不同的終端視窗執行如上的兩個命令,這時就可以在訊息生產者視窗輸入內容,然後在消費者視窗看到它。

所有的命令列工具都有額外的引數,執行命令時不帶任何引數即可顯示出引數資訊詳情。

第六步:啟動一個多broker叢集

到目前為止,我們只啟動了一個單broker。對於Kafka來說,一個單broker也是一個叢集,只不過叢集的大小是1。其實我們啟動一個多broker叢集的話,並不會複雜多少。現在我們來嘗試一下,如何在同一個機器上啟動3個broker節點的叢集。

首先,我們為每一個broker建立一個配置檔案(Windows上使用copy命令代替cp)。

>cp config/server.properties config/server-1.properties

>cp config/server.properties config/server-2.properties

按如下內容編輯各個配置檔案:

“` config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1

config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 “`

其中broker.id屬性是每個節點在叢集中唯一的名字。埠和日誌儲存目錄則由於我們這幾個節點都在同一臺機器上啟動而必須要修改。 前面的步驟裡我們已經有了一個啟動好的單節點kafka和Zookeeper,現在我們只需要啟動這兩個新配置的節點:

>bin/kafka-server-start.sh config/server-1.properties &

>bin/kafka-server-start.sh config/server-2.properties &

現在我們建立一個複製因子為3的新主題my-replicated-topic:

>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic

建立完成,但是怎麼才能知道主題被建立在整個叢集中的哪個broker上了呢?事實上我們可以使用如下顯示主題描述資訊的命令:

>bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic:my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

此時會輸出主題的描述資訊:第一行給出了所有分割槽資訊的摘要,接下來的每一行則給出具體的每一個分割槽資訊。因為我們前面建立的這個主題只有一個分割槽,所以就只展示了一行。

“leader”節點 “leader”節點負責響應給定節點的所有讀寫操作。每個節點都可能成為所有分割槽中一個隨機選擇分割槽的leader。 “replicas”是複製當前分割槽的節點列表,無論這些節點是不是leader、是不是可用。 “isr”是目前處於同步狀態的replicas集合。它是replicas列表的子集,其中僅包含當前可用並且與leader同步的節點。 注意上述例子中,編號為1的節點是這個只有一個分割槽的主題的leader。

我們可以在最開始建立的主題上運行同樣的命令,看看會發生什麼:

>bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test

Topic:test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

結果是很明顯的:原來的主題在編號為0的服務端上,它是我們建立的這個叢集上唯一的服務端,沒有複製節點(replicas)。

現在我們來發布一些訊息到新建立的主題:

>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic

my test message 1

my test message 2

^C

然後消費這些訊息:

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic my-replicated-topic

my test message 1

my test message 2

^C

接著我們來測試一下容錯性。編號為1的broker現在是leader,我們把它kill掉:

>ps aux | grep server-1.properties

7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java…

>kill -9 7564

在Windows上使用:

On Windows use:

>wmic process get processid,caption,commandline | find “java.exe” | find “server-1.properties”

java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC … build\libs\kafka_2.10-0.10.2.0.jar” kafka.Kafka config\server-1.properties 644

taskkill /pid 644 /f

主節點直接切換到其中的一個從節點,並且編號為1的節點不再位於同步複製節點集合了:

>bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

Topic: my-replated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

但是訊息現在仍然對消費者可用,儘管負責處理寫訊息的主節點已經宕掉了:

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic my-replicated-topic

my test message 1

my test message 2

^C

第七步:使用Kafka聯結器(Kafka Connect)匯入匯出資料

使用控制檯讀寫資料固然方便,但是有時我們還是希望從其他資料來源匯入資料,或者匯出資料到其他系統。很多時候我們可以無需通過編寫整合程式碼,僅僅使用Kafka聯結器就可以實現資料匯入匯出。

Kafka聯結器是一個用於從Kafka匯入匯出資料的工具。它可以通過擴充套件實現自定義邏輯,或者直接與外部系統互動。在本教程我們將展示如何簡單的使用Kafka聯結器,實現從一個檔案匯入資料到Kafka主題,以及從Kafka主題匯出資料到一個檔案。

首先,我們建立一個測試用的文字檔案:

>echo -e “foo\nbar” > test.txt

然後我們在單機模式啟動兩個聯結器,即它們執行在同一個本地程序。這裡我們使用3個配置檔案作為引數。第一個檔案是針對Kafka聯結器程序的通用配置,包含連線到的Kafka Broker和資料的序列化格式。後面的每一個檔案代表一個聯結器。它們每個都包含一個唯一的聯結器名稱,要例項化的聯結器型別和此聯結器需要的其他配置。

>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

這些示例配置檔案都包含在Kafka中,它們使用前面步驟啟動的本地叢集配置,建立兩個聯結器:第一個作為源聯結器,從一個文字檔案讀取資料然後將每一行作為一個訊息寫入到指定的Kafka主題;第二個作為接收端聯結器,從一個Kafka主題讀取訊息,並將每條訊息作為一行寫入指定的文字檔案。

啟動過程中我們可以看到一些日誌資訊,其中包括哪些聯結器被例項化了。一旦Kafka聯結器程序啟動,源聯結器就開始從test.txt檔案讀取資訊,然後把每一行內容作為一個訊息傳送到名為connect-test的主題;接收端聯結器就開始從connect-test主題讀取訊息,然後把每一個訊息內容作為一行寫入test.sink.txt檔案。我們可以檢視這個檔案的內容來驗證這些經過整個訊息管道傳遞的資料:

>cat test.sink.txt

foo

bar

訊息資料被Kafka儲存在connect-test主題中,這樣我們也可以在控制檯啟動一個消費者來檢視主題裡的訊息(或者使用自定義的消費程式碼來處理):

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic connect-test –from-beginning

{“schema”:{“type”:”string”,”optional”:false},”payload”:”foo”} {“schema”:{“type”:”string”,”optional”:false},”payload”:”bar”}

這些聯結器會持續的處理資料,因此我們可以通過新增資料到輸入檔案,然後看到訊息通過整個管道: The connectors continue to process data, so we can add data to the file and see it move through the pipeline:

>echo “Another line” >> test.txt

然後我們可以看到這行資料在消費者所在的控制檯以及接收端檔案裡出現。

第八步:使用Kafka流(Kafka Streams)處理資料

Kafka流是一個針對儲存於Kafka brokers上的資料進行實時流處理和分析的客戶端類庫。快速入門中的示例將展示如何使用這個類庫實現一個數據流處理應用。下面是其中的WordCountDemo數單詞示例程式碼片段(轉換成Java8的lambda表示式更便於閱讀)。

“` // 字串和長整型的序列化器與反序列化器(serde) final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long();

// 從streams-file-input主題構造一個KStream,訊息值代表了文字的每一行(這裡我們忽略訊息key中儲存的資料) KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, “streams-file-input”);

KTable<String, Long> wordCounts = textLines // 按空格拆分每個文字行為多個單獨的詞 .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“\W+”)))

// 將單詞分組後作為訊息的key
.groupBy((key, value) -> value)

// 統計每個單詞出現的次數(即訊息的key)
.count("Counts")

// 將執行結果作為變更記錄流傳送到輸出主題 wordCounts.to(stringSerde, longSerde, “streams-wordcount-output”); “`

上面的程式碼實現了數單詞演算法(WordCount algorithm),即計算了輸入文字中每一個單詞出現的次數。不同於我們常見的計算給定數量文字的數單詞演算法,這個示例被設計來操作一個無限的、不確定資料量的資料流。跟有界的情況類似,這是一個有狀態的演算法,會跟蹤和更新單詞的數目。此演算法必須假定輸入的資料是沒有邊界的,這樣因為不知道什麼時候處理完所有的資料,所以每當處理了新輸入的資料時,上述程式碼會隨時輸出當前的狀態和處理結果。

我們先準備傳送到Kafka主題的輸入資料,這些資料將被Kafka流程式依次處理。

>echo -e “all streams lead to kafka\nhello kafka streams\njoin kafka summit” > file-input.txt

在Windows上執行:

>echo all streams lead to kafka> file-input.txt

>echo hello kafka streams>> file-input.txt

>echo|set /p=join kafka summit>> file-input.txt

接著,我們控制檯上使用生產者指令碼將這些資料傳送到一個叫streams-file-input的主題,此指令碼會從標準輸入(STDIN)一行行的讀取資料,然後把每一行內容作為一個單獨的、key為null、值為字串格式的Kafka訊息,傳送到這個主題(在實際應用中,只要應用程式一直執行,資料就可以一直持續的流向Kafka):

>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic streams-file-input

>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic streams-file-input < file-input.txt

此時我們可以執行數單詞示例程式來處理輸入資料:

>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

示例程式將從streams-file-input主題讀入資料,然後用數單詞演算法計算每一個訊息資料,持續將當前計算結果寫入到streams-wordcount-output主題。到目前為止,我們在任何命令列的標準輸出上看不到這些結果,因為這些結果資料都被寫入到Kafka輸出主題streams-wordcount-output了。這個示例程式也將會一直執行,不像常見的流處理程式會在處理完以後自動結束。

現在我們可以通過讀取Kafka輸出主題來檢視數單詞示例程式的輸出:

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic streams-wordcount-output –from-beginning –formatter kafka.tools.DefaultMessageFormatter –property print.key=true –property print.value=true –property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer –property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

可以看到在控制檯上輸出如下:

all 1

lead 1

to 1

hello 1

streams 2

join 1

kafka 3

summit 1

第一列是java.lang.String型別的Kafka訊息key,第二列是java.lang.Long型別的訊息值。注意到輸出的實際上是一個持續更新的流,每一個數據記錄(上面輸出的每一行)代表一個單詞(比如kafka)每次更新的計數。對於同一個key,會有多個記錄,每一個後面的記錄代表之前記錄的更新結果。

下面的兩幅圖展示了這個場景下到底發生了什麼。圖上的第一列展示了計算單詞出現次數的KTable<String, Long>的當前狀態演化。第二列展示了狀態更新導致的KTable變化記錄,也是被髮送到Kafka輸出主題streams-wordcount-output中的資料。

“all stream lead to kafka”這行文字先被處理。隨著每一個新單詞作為一個表格中的新項(綠色背景高亮顯示)加入到KTable,KTable表格逐漸增長,同時相應的變化記錄被髮送到下面的KStream中。

當第二行文字“hello kafka streams”被處理後,我們可以看到與此同時在KTable中已經存在的項立即被更新(即kafak和streams)。同樣的變化記錄也被髮送到輸出主題。

第三行處理也類似,我們暫且略去。這就解釋了為何輸出主題中的內容如上所,因為它包含了全部的變化記錄。

如果我們跳出這個具體的示例來看,Kafka流所做的事情就是表和變更記錄流之間的相互作用(這裡表指的是KTable,變更記錄流指的是下面的KStream):表中的每一個變化記錄會發送到流中,當然如果我們從頭至尾的消費一個完整的變更記錄流,則可以重建這個表的全部內容。

現在我們可以寫入更多訊息到stream-file-input主題,然後觀察這些訊息被新增到streams-wordcount-output主題,這些訊息反映出了更新的單詞計數(可以在控制檯使用上面提及的生產者指令碼和消費者指令碼來操作)。

最後我們可以在控制檯使用Ctrl-C快捷鍵來結束消費者。