1. 程式人生 > >Flink的DataSource三部曲之一:直接API

Flink的DataSource三部曲之一:直接API

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; 本文是《Flink的DataSource三部曲》系列的第一篇,該系列旨在通過實戰學習和了解Flink的DataSource,為以後的深入學習打好基礎,由以下三部分組成: 1. 直接API:即本篇,除了準備環境和工程,還學習了StreamExecutionEnvironment提供的用來建立資料來的API; 2. 內建connector:StreamExecutionEnvironment的addSource方法,入參可以是flink內建的connector,例如kafka、RabbitMQ等; 3. 自定義:StreamExecutionEnvironment的addSource方法,入參可以是自定義的SourceFunction實現類; ### Flink的DataSource三部曲文章連結 1. [《Flink的DataSource三部曲之一:直接API》](https://blog.csdn.net/boling_cavalry/article/details/105467076) 2. [《Flink的DataSource三部曲之二:內建connector》](https://blog.csdn.net/boling_cavalry/article/details/105471798) 3. [《Flink的DataSource三部曲之三:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105472218) ### 關於Flink的DataSource 官方對DataSource的解釋:Sources are where your program reads its input from,即DataSource是應用的資料來源,如下圖的兩個紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105111958456-1024455985.png) ### DataSource型別 對於常見的文字讀入、kafka、RabbitMQ等資料來源,可以直接使用Flink提供的API或者connector,如果這些滿足不了需求,還可以自己開發,下圖是我按照自己的理解梳理的: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112000377-1147903641.png) ### 環境和版本 熟練掌握內建DataSource的最好辦法就是實戰,本次實戰的環境和版本如下: 1. JDK:1.8.0_211 2. Flink:1.9.2 3. Maven:3.6.0 4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018) 5. IDEA:2018.3.5 (Ultimate Edition) ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkdatasourcedemo資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112000799-1836442467.png) ### 環境和版本 本次實戰的環境和版本如下: 1. JDK:1.8.0_211 2. Flink:1.9.2 3. Maven:3.6.0 4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018) 5. IDEA:2018.3.5 (Ultimate Edition) ### 建立工程 1. 在控制檯執行以下命令就會進入建立flink應用的互動模式,按提示輸入gourpId和artifactId,就會建立一個flink應用(我輸入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo): ```shell mvn \ archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.9.2 ``` 2. 現在maven工程已生成,用IDEA匯入這個工程,如下圖: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112001332-11218706.png) 3. 以maven的型別匯入: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112001759-1931546099.png) 4. 匯入成功的樣子: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112002204-2035126840.png) 5. 專案建立成功,可以開始寫程式碼實戰了; ### 輔助類Splitter 實戰中有個功能常用到:將字串用空格分割,轉成Tuple2型別的集合,這裡將此運算元做成一個公共類Splitter.java,程式碼如下: ```java package com.bolingcavalry; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; public class Splitter implements FlatMap