學習kafka教程(三)
歡迎關注公眾號:n平方
本文主要介紹【 Kafka Streams的架構和使用 】
目標
- 瞭解kafka streams的架構。
- 掌握kafka streams程式設計。
架構分析
總體
Kafka流通過構建Kafka生產者和消費者庫,並利用Kafka的本地功能來提供資料並行性、分散式協調、容錯和操作簡單性,從而簡化了應用程式開發。
下圖展示了一個使用Kafka Streams庫的應用程式的結構。
流分割槽和任務
Kafka的訊息傳遞層對資料進行分割槽,以儲存和傳輸資料。Kafka流劃分資料進行處理。在這兩種情況下,這種分割槽都支援資料區域性性、靈活性、可伸縮性、高效能和容錯性。Kafka流使用分割槽和任務的概念作為基於Kafka主題分割槽的並行模型的邏輯單元。Kafka流與Kafka在並行性上下文中有著緊密的聯絡:
- 每個流分割槽都是一個完全有序的資料記錄序列,並對映到Kafka主題分割槽。
- 流中的資料記錄對映到來自該主題的Kafka訊息。
- 資料記錄的鍵值決定了Kafka流和Kafka流中資料的分割槽,即,如何將資料路由到主題中的特定分割槽。
應用程式的處理器拓撲通過將其分解為多個任務進行擴充套件。
更具體地說,Kafka流基於應用程式的輸入流分割槽建立固定數量的任務,每個任務分配一個來自輸入流的分割槽列表(例如,kafka的topic)。分配給任務的分割槽永遠不會改變,因此每個任務都是應用程式並行性的固定單元。
然後,任務可以基於分配的分割槽例項化自己的處理器拓撲;它們還為每個分配的分割槽維護一個緩衝區,並從這些記錄緩衝區一次處理一條訊息。
因此,流任務可以獨立並行地處理,而無需人工干預。
理解Kafka流不是一個資源管理器,而是一個“執行”其流處理應用程式執行的任何地方的庫。應用程式的多個例項要麼在同一臺機器上執行,要麼分佈在多臺機器上,庫可以自動將任務分配給執行應用程式例項的那些例項。分配給任務的分割槽從未改變;如果應用程式例項失敗,它分配的所有任務將在其他例項上自動重新啟動,並繼續從相同的流分割槽使用。
下圖顯示了兩個任務,每個任務分配一個輸入流分割槽。
執行緒模型
Kafka流允許使用者配置庫用於在應用程式例項中並行處理的執行緒數。每個執行緒可以獨立地使用其處理器拓撲執行一個或多個任務。
例如,下圖顯示了一個流執行緒執行兩個流任務。
啟動更多的流執行緒或應用程式例項僅僅相當於複製拓撲並讓它處理Kafka分割槽的不同子集,從而有效地並行處理。值得注意的是,執行緒之間不存在共享狀態,因此不需要執行緒間的協調。這使得跨應用程式例項和執行緒並行執行拓撲變得非常簡單。Kafka主題分割槽在各種流執行緒之間的分配是由Kafka流利用Kafka的協調功能透明地處理的。
如上所述,使用Kafka流擴充套件您的流處理應用程式很容易:您只需要啟動應用程式的其他例項,Kafka流負責在應用程式例項中執行的任務之間分配分割槽。您可以啟動與輸入Kafka主題分割槽一樣多的應用程式執行緒,以便在應用程式的所有執行例項中,每個執行緒(或者更確切地說,它執行的任務)至少有一個輸入分割槽要處理。
本地狀態儲存
Kafka流提供了所謂的狀態儲存,流處理應用程式可以使用它來儲存和查詢資料,這是實現有狀態操作時的一項重要功能。例如,Kafka Streams DSL在呼叫有狀態操作符(如join()或aggregate())或開啟流視窗時自動建立和管理這樣的狀態儲存。
Kafka Streams應用程式中的每個流任務都可以嵌入一個或多個本地狀態儲存,這些儲存可以通過api訪問,以儲存和查詢處理所需的資料。Kafka流為這種本地狀態儲存提供容錯和自動恢復功能。
下圖顯示了兩個流任務及其專用的本地狀態儲存。
容錯
Kafka流構建於Kafka中本地整合的容錯功能之上。Kafka分割槽是高度可用和複製的;因此,當流資料持久化到Kafka時,即使應用程式失敗並需要重新處理它,流資料也是可用的。Kafka流中的任務利用Kafka消費者客戶端提供的容錯功能來處理失敗。如果任務在失敗的機器上執行,Kafka流將自動在應用程式的一個剩餘執行例項中重新啟動該任務。
此外,Kafka流還確保本地狀態儲存對於故障也是健壯的。對於每個狀態儲存,它維護一個複製的changelog Kafka主題,其中跟蹤任何狀態更新。這些變更日誌主題也被分割槽,這樣每個本地狀態儲存例項,以及訪問該儲存的任務,都有自己專用的變更日誌主題分割槽。在changelog主題上啟用了日誌壓縮,這樣可以安全地清除舊資料,防止主題無限增長。如果任務在一臺失敗的機器上執行,並在另一臺機器上重新啟動,Kafka流通過在恢復對新啟動的任務的處理之前重播相應的更改日誌主題,確保在失敗之前將其關聯的狀態儲存恢復到內容。因此,故障處理對終端使用者是完全透明的。
程式設計例項
管道(輸入輸出)例項
就是控制檯輸入到kafka中,經過處理輸出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class PipeDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
分詞例項
就是將你輸入的字串進行分詞輸出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class LineSplitDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
詞彙統計例項
將你輸入的字串進行按單詞統計輸出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCountDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
最後
本人水平有限,歡迎各位建議以及指正。順便關注一下公眾號唄,會經常更新文章的哦。