Flink的DataSource三部曲之一:直接API
阿新 • • 發佈:2020-11-05
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
本文是《Flink的DataSource三部曲》系列的第一篇,該系列旨在通過實戰學習和了解Flink的DataSource,為以後的深入學習打好基礎,由以下三部分組成:
1. 直接API:即本篇,除了準備環境和工程,還學習了StreamExecutionEnvironment提供的用來建立資料來的API;
2. 內建connector:StreamExecutionEnvironment的addSource方法,入參可以是flink內建的connector,例如kafka、RabbitMQ等;
3. 自定義:StreamExecutionEnvironment的addSource方法,入參可以是自定義的SourceFunction實現類;
### Flink的DataSource三部曲文章連結
1. [《Flink的DataSource三部曲之一:直接API》](https://blog.csdn.net/boling_cavalry/article/details/105467076)
2. [《Flink的DataSource三部曲之二:內建connector》](https://blog.csdn.net/boling_cavalry/article/details/105471798)
3. [《Flink的DataSource三部曲之三:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105472218)
### 關於Flink的DataSource
官方對DataSource的解釋:Sources are where your program reads its input from,即DataSource是應用的資料來源,如下圖的兩個紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105111958456-1024455985.png)
### DataSource型別
對於常見的文字讀入、kafka、RabbitMQ等資料來源,可以直接使用Flink提供的API或者connector,如果這些滿足不了需求,還可以自己開發,下圖是我按照自己的理解梳理的:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112000377-1147903641.png)
### 環境和版本
熟練掌握內建DataSource的最好辦法就是實戰,本次實戰的環境和版本如下:
1. JDK:1.8.0_211
2. Flink:1.9.2
3. Maven:3.6.0
4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
5. IDEA:2018.3.5 (Ultimate Edition)
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkdatasourcedemo資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112000799-1836442467.png)
### 環境和版本
本次實戰的環境和版本如下:
1. JDK:1.8.0_211
2. Flink:1.9.2
3. Maven:3.6.0
4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
5. IDEA:2018.3.5 (Ultimate Edition)
### 建立工程
1. 在控制檯執行以下命令就會進入建立flink應用的互動模式,按提示輸入gourpId和artifactId,就會建立一個flink應用(我輸入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):
```shell
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
```
2. 現在maven工程已生成,用IDEA匯入這個工程,如下圖:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112001332-11218706.png)
3. 以maven的型別匯入:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112001759-1931546099.png)
4. 匯入成功的樣子:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201105112002204-2035126840.png)
5. 專案建立成功,可以開始寫程式碼實戰了;
### 輔助類Splitter
實戰中有個功能常用到:將字串用空格分割,轉成Tuple2型別的集合,這裡將此運算元做成一個公共類Splitter.java,程式碼如下:
```java
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMap