1. 程式人生 > >Flink處理函式實戰之二:ProcessFunction類

Flink處理函式實戰之二:ProcessFunction類

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### Flink處理函式實戰系列連結 1. [深入瞭解ProcessFunction的狀態操作(Flink-1.10)](https://blog.csdn.net/boling_cavalry/article/details/106040312); 2. [ProcessFunction](https://xinchen.blog.csdn.net/article/details/106299035); 3. [KeyedProcessFunction類](https://xinchen.blog.csdn.net/article/details/106299167); 4. [ProcessAllWindowFunction(視窗處理)](https://xinchen.blog.csdn.net/article/details/106453229); 5. [CoProcessFunction(雙流處理)](https://xinchen.blog.csdn.net/article/details/109614001); ### 關於處理函式(Process Function) 如下圖,在常規的業務開發中,SQL、Table API、DataStream API比較常用,處於Low-level的Porcession相對用得較少,從本章開始,我們一起通過實戰來熟悉處理函式(Process Function),看看這一系列的低階運算元可以帶給我們哪些能力? ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201120092347364-432672709.png) ### 關於ProcessFunction類 處理函式有很多種,最基礎的應該ProcessFunction類,來看看它的類圖,可見有RichFunction的特性open、close,然後自己有兩個重要的方法processElement和onTimer: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201120092347965-2008655427.png) 常用特性如下所示: 1. 處理單個元素; 2. 訪問時間戳; 3. 旁路輸出; 接下來寫兩個應用體驗上述功能; ### 版本資訊 1. 開發環境作業系統:MacBook Pro 13寸, macOS Catalina 10.15.3 2. 開發工具:IDEA ULTIMATE 2018.3 3. JDK:1.8.0_211 4. Maven:3.6.0 5. Flink:1.9.2 ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201120092348447-1449527323.png) ### 建立工程 執行以下命令建立一個flink-1.9.2的應用工程: ```shell mvn \ archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.9.2 ``` 按提示輸入groupId:com.bolingcavalry,architectid:flinkdemo ### 第一個demo 第一個demo用來體驗以下兩個特性: 1. 處理單個元素; 2. 訪問時間戳; 建立Simple.java,內容如下: ```java package com.bolingcavalry.processfunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; public class Simple { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 並行度為1 env.setParallelism(1); // 設定資料來源,一共三個元素 Da