學習kafka教程(二)
歡迎關注公眾號:n平方
本文主要介紹【KafkaStreams 】
簡介
Kafka Streams編寫關鍵任務實時應用程式和微服務的最簡單方法,是一個用於構建應用程式和微服務的客戶端庫,其中輸入和輸出資料儲存在Kafka叢集中。它結合了在客戶端編寫和部署標準Java和Scala應用程式的簡單性和Kafka伺服器端叢集技術的優點。
Kafka Streams是一個用於構建關鍵任務實時應用程式和微服務的客戶端庫,其中輸入和/或輸出資料儲存在Kafka叢集中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程式的簡單性和Kafka伺服器端叢集技術的優點,使這些應用程式具有高度可伸縮性、靈活性、容錯性、分散式等等。
目標
- 瞭解kafka Streams
- 會使用kafka Streams
過程
1.首先WordCountDemo示例程式碼(Java8以上)
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
它實現了WordCount演算法,該演算法從輸入文字計算單詞出現的直方圖。然而,與您以前可能看到的對有界資料進行操作的其他WordCount示例不同,WordCount演示應用程式的行為略有不同,因為它被設計為對無限、無界的資料流進行操作。與有界變數類似,它是一種有狀態演算法,用於跟蹤和更新單詞的計數。然而,由於它必須假定輸入資料可能是無界的,因此它將週期性地輸出當前狀態和結果,同時繼續處理更多的資料,因為它不知道何時處理了“所有”輸入資料。
2.安裝並啟動zookeeper和kafka
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
3.建立主題
接下來,我們建立名為streams-plain -input的輸入主題和名為streams-wordcount-output的輸出主題:
bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input Created topic "streams-plaintext-input"
我們建立啟用壓縮的輸出主題,因為輸出流是一個變更日誌流.
bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact Created topic "streams-wordcount-output"
建立的主題也可以使用相同的kafka主題進行描述
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
4.啟動Wordcount應用程式
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
a)演示應用程式將從輸入主題流(明文輸入)中讀取,對每個讀取的訊息執行WordCount演算法的計算,並不斷將其當前結果寫入輸出主題流(WordCount -output)。因此,除了日誌條目之外,不會有任何STDOUT輸出,因為結果是用Kafka寫回去的。
b)現在我們可以在一個單獨的終端上啟動控制檯生成器,向這個主題寫入一些輸入資料和檢查輸出的WordCount演示應用程式從其輸出主題與控制檯消費者在一個單獨的終端.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
c)輸入端: 現在讓我們使用控制檯生成器將一些訊息寫入輸入主題流——純文字輸入,方法是輸入一行文字,然後單擊。這將傳送新訊息輸入主題,訊息鍵為空和訊息值是剛才輸入的字串編碼的文字行。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
此時你可以在控制檯 輸入如下字元:
all streams lead to kafka
d))輸出端: 此訊息將由Wordcount應用程式處理,以下輸出資料將寫入streams-wordcount-output主題並由控制檯使用者列印:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
這個時候會接收到剛剛在控制檯 輸入的單詞統計結果:
all1 streams 1 lead1 to1 kafka1
如此類推:你可以在輸入端 輸入單詞,對應的在輸出端 就會有統計結果。
小結:
可以看到,Wordcount應用程式的輸出實際上是連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是記錄鍵,如“kafka”。對於具有相同鍵的多個記錄,後面的每個記錄都是前一個記錄的更新。
下面的兩個圖說明了幕後的本質。第一列顯示KTable的當前狀態的演變,該狀態為count計算單詞出現的次數。第二列顯示KTable的狀態更新所產生的更改記錄,這些記錄被髮送到輸出Kafka主題流-wordcount-output。
最後
本人水平有限,歡迎各位建議以及指正。順便關注一下公眾號唄,會經常更新文章的哦。