1. 程式人生 > >《從0到1學習Flink》—— Flink 寫入數據到 Kafka

《從0到1學習Flink》—— Flink 寫入數據到 Kafka

apach schema exceptio 微信公眾 通過 parallel sin 序列化 快速入門

技術分享圖片

前言

之前文章 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch 寫了如何將 Kafka 中的數據存儲到 ElasticSearch 中,裏面其實就已經用到了 Flink 自帶的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一種情況,那麽如果我們有多個地方需要這份通過 Flink 轉換後的數據,是不是又要我們繼續寫個 sink 的插件呢?確實,所以 Flink 裏面就默認支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那麽這篇文章我們就講講如何將數據寫入到 Kafka。

技術分享圖片

準備

添加依賴

Flink 裏面支持 Kafka 0.8、0.9、0.10、0.11 ,以後有時間可以分析下源碼的實現。

技術分享圖片

這裏我們需要安裝下 Kafka,請對應添加對應的 Flink Kafka connector 依賴的版本,這裏我們使用的是 0.11 版本:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

Kafka 安裝

這裏就不寫這塊內容了,可以參考我以前的文章 Kafka 安裝及快速入門。

這裏我們演示把其他 Kafka 集群中 topic 數據原樣寫入到自己本地起的 Kafka 中去。

配置文件

kafka.brokers=xxx:9092,xxx:9092,xxx:9092
kafka.group.id=metrics-group-test
kafka.zookeeper.connect=xxx:2181
metrics.topic=xxx
stream.parallelism=5
kafka.sink.brokers=localhost:9092
kafka.sink.topic=metric-test
stream.checkpoint.interval=1000
stream.checkpoint.enable=false
stream.sink.parallelism=5

目前我們先看下本地 Kafka 是否有這個 metric-test topic 呢?需要執行下這個命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

技術分享圖片

可以看到本地的 Kafka 是沒有任何 topic 的,如果等下我們的程序運行起來後,再次執行這個命令出現 metric-test topic,那麽證明我的程序確實起作用了,已經將其他集群的 Kafka 數據寫入到本地 Kafka 了。

程序代碼

Main.java

public class Main {
    public static void main(String[] args) throws Exception{
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);

        data.addSink(new FlinkKafkaProducer011<Metrics>(
                parameterTool.get("kafka.sink.brokers"),
                parameterTool.get("kafka.sink.topic"),
                new MetricSchema()
                )).name("flink-connectors-kafka")
                .setParallelism(parameterTool.getInt("stream.sink.parallelism"));

        env.execute("flink learning connectors kafka");
    }
}

運行結果

啟動程序,查看運行結果,不段執行上面命令,查看是否有新的 topic 出來:

技術分享圖片

執行命令可以查看該 topic 的信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test

技術分享圖片

分析

上面代碼我們使用 Flink Kafka Producer 只傳了三個參數:brokerList、topicId、serializationSchema(序列化)

技術分享圖片

其實也可以傳入多個參數進去,現在有的參數用的是默認參數,因為這個內容比較多,後面可以抽出一篇文章單獨來講。

總結

本篇文章寫了 Flink 讀取其他 Kafka 集群的數據,然後寫入到本地的 Kafka 上。我在 Flink 這層沒做什麽數據轉換,只是原樣的將數據轉發了下,如果你們有什麽其他的需求,是可以在 Flink 這層將數據進行各種轉換操作,比如這篇文章中的一些轉換:《從0到1學習Flink》—— Flink Data transformation(轉換),然後將轉換後的數據發到 Kafka 上去。

本文原創地址是: http://www.54tianzhisheng.cn/2019/01/06/Flink-Kafka-sink/ , 未經允許禁止轉載。

關註我

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然後回復關鍵字:Flink 即可無條件獲取到。

技術分享圖片

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以後這個項目的所有代碼都將放在這個倉庫裏,包含了自己學習 flink 的一些 demo 和博客

相關文章

1、《從0到1學習Flink》—— Apache Flink 介紹

2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門

3、《從0到1學習Flink》—— Flink 配置文件詳解

4、《從0到1學習Flink》—— Data Source 介紹

5、《從0到1學習Flink》—— 如何自定義 Data Source ?

6、《從0到1學習Flink》—— Data Sink 介紹

7、《從0到1學習Flink》—— 如何自定義 Data Sink ?

8、《從0到1學習Flink》—— Flink Data transformation(轉換)

9、《從0到1學習Flink》—— 介紹Flink中的Stream Windows

10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解

11、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch

12、《從0到1學習Flink》—— Flink 項目如何運行?

13、《從0到1學習Flink》—— Flink 寫入數據到 Kafka

《從0到1學習Flink》—— Flink 寫入數據到 Kafka