1. 程式人生 > >《從0到1學習Flink》—— Flink 寫入資料到 Kafka

《從0到1學習Flink》—— Flink 寫入資料到 Kafka

開發十年,就只剩下這套架構體系了! >>>   

前言

之前文章 《從0到1學習Flink》—— Flink 寫入資料到 ElasticSearch 寫了如何將 Kafka 中的資料儲存到 ElasticSearch 中,裡面其實就已經用到了 Flink 自帶的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一種情況,那麼如果我們有多個地方需要這份通過 Flink 轉換後的資料,是不是又要我們繼續寫個 sink 的外掛呢?確實,所以 Flink 裡面就預設支援了不少 sink,比如也支援 Kafka sink connector(FlinkKafkaProducer),那麼這篇文章我們就講講如何將資料寫入到 Kafka。

準備

新增依賴

Flink 裡面支援 Kafka 0.8、0.9、0.10、0.11 ,以後有時間可以分析下原始碼的實現。

這裡我們需要安裝下 Kafka,請對應新增對應的 Flink Kafka connector 依賴的版本,這裡我們使用的是 0.11 版本:

1
2
3
4
5
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

Kafka 安裝

這裡就不寫這塊內容了,可以參考我以前的文章 Kafka 安裝及快速入門

這裡我們演示把其他 Kafka 叢集中 topic 資料原樣寫入到自己本地起的 Kafka 中去。

配置檔案

1
2
3
4
5
6
7
8
9
10
kafka.brokers=xxx:9092,xxx:9092,xxx:9092
kafka.group.id=metrics-group-test
kafka.zookeeper.connect=xxx:2181
metrics.topic=xxx
stream.parallelism=5
kafka.sink.brokers=localhost:9092
kafka.sink.topic=metric-test
stream.checkpoint.interval=1000
stream.checkpoint.enable=false
stream.sink.parallelism=5

目前我們先看下本地 Kafka 是否有這個 metric-test topic 呢?需要執行下這個命令:

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

可以看到本地的 Kafka 是沒有任何 topic 的,如果等下我們的程式執行起來後,再次執行這個命令出現 metric-test topic,那麼證明我的程式確實起作用了,已經將其他叢集的 Kafka 資料寫入到本地 Kafka 了。

程式程式碼

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main {
    public static void main(String[] args) throws Exception{
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);

        data.addSink(new FlinkKafkaProducer011<Metrics>(
                parameterTool.get("kafka.sink.brokers"),
                parameterTool.get("kafka.sink.topic"),
                new MetricSchema()
                )).name("flink-connectors-kafka")
                .setParallelism(parameterTool.getInt("stream.sink.parallelism"));

        env.execute("flink learning connectors kafka");
    }
}

執行結果

啟動程式,檢視執行結果,不段執行上面命令,檢視是否有新的 topic 出來:

執行命令可以檢視該 topic 的資訊:

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test

分析

上面程式碼我們使用 Flink Kafka Producer 只傳了三個引數:brokerList、topicId、serializationSchema(序列化)

其實也可以傳入多個引數進去,現在有的引數用的是預設引數,因為這個內容比較多,後面可以抽出一篇文章單獨來講。

總結

本篇文章寫了 Flink 讀取其他 Kafka 叢集的資料,然後寫入到本地的 Kafka 上。我在 Flink 這層沒做什麼資料轉換,只是原樣的將資料轉發了下,如果你們有什麼其他的需求,是可以在 Flink 這層將資料進行各種轉換操作,比如這篇文章中的一些轉換:《從0到1學習Flink》—— Flink Data transformation(轉換),然後將轉換後的資料