CoProcessFunction實戰三部曲之三:定時器和側輸出
阿新 • • 發佈:2020-12-07
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 系列文章連結
1. [基本功能](https://xinchen.blog.csdn.net/article/details/109624375)
2. [狀態處理](https://xinchen.blog.csdn.net/article/details/109629119)
3. [定時器和側輸出](https://xinchen.blog.csdn.net/article/details/109645214)
### 本篇概覽
- 本文是《CoProcessFunction實戰三部曲》的終篇,主要內容是在CoProcessFunction中使用定時器和側輸出,對上一篇的功能進行增強;
- 回顧上一篇的功能:一號流收到aaa後儲存在狀態中,直到二號流收到aaa,把兩個aaa的值相加後輸出到下游;
- 上述功能有個問題:二號流如果一直收不到aaa,下游就一直沒有aaa的輸出,相當於進入一號流的aaa已經石沉大海了;
- 今天的實戰就是修復上述問題:aaa在一個流中出現後,10秒之內如果出現在另一個流中,就像以前那樣值相加,輸出到下游,如果10秒內沒有出現在另一個流,就流向側輸出,再將所有狀態清理乾淨;
### 參考文章
1. 理解狀態:[《深入瞭解ProcessFunction的狀態操作(Flink-1.10)》](https://xinchen.blog.csdn.net/article/details/106040312)
2. 理解定時器:[《理解ProcessFunction的Timer邏輯》](https://xinchen.blog.csdn.net/article/details/109564999)
### 梳理流程
- 為了編碼的邏輯正確,咱們把正常和異常的流程先梳理清楚;
- 下圖是正常流程:aaa在一號流出現後,10秒內又在二號流出現了,於是相加並流向下遊:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073418461-917425372.png)
- 再來看異常的流程,如下圖,一號流在16:14:01收到aaa,但二號流一直沒有收到aaa,等到10秒後,也就是16:14:11,定時器被觸發,從狀態1得知10秒前一號流收到過aaa,於是將資料流向一號側輸出:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073418704-1654788180.png)
- 接下來編碼實現上面的功能;
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073420708-744335415.png)
### CoProcessFunction的子類
1. 前面的兩篇實戰中,CoProcessFunction的子類都寫成了匿名類,如下圖紅框:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073420994-231561697.png)
2. 本文中,CoProcessFunction子類會用到外部類的成員變數,因此不能再用匿名類了,新增CoProcessFunction的子類ExecuteWithTimeoutCoProcessFunction.java,稍後會說明幾個關鍵點:
```java
package com.bolingcavalry.coprocessfunction;
import com.bolingcavalry.Utils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 實現雙流業務邏輯的功能類
*/
public class ExecuteWithTimeoutCoProcessFunction extends CoProcess