1. 程式人生 > >Flink的sink實戰之二:kafka

Flink的sink實戰之二:kafka

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 本篇概覽 本文是《Flink的sink實戰》系列的第二篇,前文[《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628)對sink有了基本的瞭解,本章來體驗將資料sink到kafka的操作; ### 全系列連結 1. [《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628) 2. [《Flink的sink實戰之二:kafka》](https://blog.csdn.net/boling_cavalry/article/details/105598224) 3. [《Flink的sink實戰之三:cassandra3》](https://blog.csdn.net/boling_cavalry/article/details/105598968) 4. [《Flink的sink實戰之四:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105599511) ### 版本和環境準備 本次實戰的環境和版本如下: 1. JDK:1.8.0_211 2. Flink:1.9.2 3. Maven:3.6.0 4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018) 5. IDEA:2018.3.5 (Ultimate Edition) 6. Kafka:2.4.0 7. Zookeeper:3.5.5 請確保上述環境和服務已經就緒; ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinksinkdemo資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201109081046069-1123443178.png) 準備完畢,開始開發; ### 準備工作 正式編碼前,先去官網檢視相關資料瞭解基本情況: 1. 地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html 2. 我這裡用的kafka是2.4.0版本,在官方文件查詢對應的庫和類,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201109081046570-900671175.png) ### kafka準備 1. 建立名為test006的topic,有四個分割槽,參考命令: ```shell ./kafka-topics.sh \ --create \ --bootstrap-server 127.0.0.1:9092 \ --replication-factor 1 \ --partitions 4 \ --topic test006 ``` 2. 在控制檯消費test006的訊息,參考命令: ```shell ./kafka-console-consumer.sh \ --bootstrap-server 127.0.0.1:9092 \ --topic test006 ``` 3. 此時如果該topic有訊息進來,就會在控制檯輸出; 4. 接下來開始編碼; ### 建立工程 1. 用maven命令建立flink工程: ```shell mvn \ archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.9.2 ``` 2. 根據提示,groupid輸入com.bolingcavalry,artifactid輸入flinksinkdemo,即可建立一個maven工程; 3. 在pom.xml中增加kafka依賴庫: ```xml ``` 4. 工程建立完成,開始編寫flink任務的程式碼; ### 傳送字串訊息的sink 先嚐試傳送字串型別的訊息: 1. 建立KafkaSerializationSchema介面的實現類,後面這個類要作為建立sink物件的引數使用: ```java package com.bolingcavalry.addsink; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import java.nio.charset.StandardCharsets; public class ProducerStringSerializationSchema implements KafkaSerializati