1. 程式人生 > >Flink的DataSource三部曲之三:自定義

Flink的DataSource三部曲之三:自定義

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 本篇概覽 本文是《Flink的DataSource三部曲》的終篇,前面都是在學習Flink已有的資料來源功能,但如果這些不能滿足需要,就要自定義資料來源(例如從資料庫獲取資料),也就是今天實戰的內容,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201107100142690-985742068.png) ### Flink的DataSource三部曲文章連結 1. [《Flink的DataSource三部曲之一:直接API》](https://blog.csdn.net/boling_cavalry/article/details/105467076) 2. [《Flink的DataSource三部曲之二:內建connector》](https://blog.csdn.net/boling_cavalry/article/details/105471798) 3. [《Flink的DataSource三部曲之三:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105472218) ### 環境和版本 本次實戰的環境和版本如下: 1. JDK:1.8.0_211 2. Flink:1.9.2 3. Maven:3.6.0 4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018) 5. IDEA:2018.3.5 (Ultimate Edition) ### 在伺服器上搭建Flink服務 1. 前面兩章的程式都是在IDEA上執行的,本章需要通過Flink的web ui觀察執行結果,因此要單獨部署Flink服務,我這裡是在CentOS環境通過docker-compose部署的,以下是docker-compose.yml的內容,用於參考: ```yml version: "2.1" services: jobmanager: image: flink:1.9.2-scala_2.12 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager1: image: flink:1.9.2-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager2: image: flink:1.9.2-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager ``` 2. 下圖是我的Flink情況,有兩個Task Maganer,共八個Slot全部可用: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201107100142972-113772455.png) ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkdatasourcedemo資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201107100143262-44184397.png) 準備完畢,開始開發; ### 實現SourceFunctionDemo介面的DataSource 1. 從最簡單的開始,開發一個不可並行的資料來源並驗證; 2. 實現SourceFunction介面,在工程flinkdatasourcedemo中增加SourceFunctionDemo.java: ```java package com.bolingcavalry.customize; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; public class SourceFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //並行度為2 env.setParallelism(2); Da