1. 程式人生 > >CoProcessFunction實戰三部曲之三:定時器和側輸出

CoProcessFunction實戰三部曲之三:定時器和側輸出

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 系列文章連結 1. [基本功能](https://xinchen.blog.csdn.net/article/details/109624375) 2. [狀態處理](https://xinchen.blog.csdn.net/article/details/109629119) 3. [定時器和側輸出](https://xinchen.blog.csdn.net/article/details/109645214) ### 本篇概覽 - 本文是《CoProcessFunction實戰三部曲》的終篇,主要內容是在CoProcessFunction中使用定時器和側輸出,對上一篇的功能進行增強; - 回顧上一篇的功能:一號流收到aaa後儲存在狀態中,直到二號流收到aaa,把兩個aaa的值相加後輸出到下游; - 上述功能有個問題:二號流如果一直收不到aaa,下游就一直沒有aaa的輸出,相當於進入一號流的aaa已經石沉大海了; - 今天的實戰就是修復上述問題:aaa在一個流中出現後,10秒之內如果出現在另一個流中,就像以前那樣值相加,輸出到下游,如果10秒內沒有出現在另一個流,就流向側輸出,再將所有狀態清理乾淨; ### 參考文章 1. 理解狀態:[《深入瞭解ProcessFunction的狀態操作(Flink-1.10)》](https://xinchen.blog.csdn.net/article/details/106040312) 2. 理解定時器:[《理解ProcessFunction的Timer邏輯》](https://xinchen.blog.csdn.net/article/details/109564999) ### 梳理流程 - 為了編碼的邏輯正確,咱們把正常和異常的流程先梳理清楚; - 下圖是正常流程:aaa在一號流出現後,10秒內又在二號流出現了,於是相加並流向下遊: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073418461-917425372.png) - 再來看異常的流程,如下圖,一號流在16:14:01收到aaa,但二號流一直沒有收到aaa,等到10秒後,也就是16:14:11,定時器被觸發,從狀態1得知10秒前一號流收到過aaa,於是將資料流向一號側輸出: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073418704-1654788180.png) - 接下來編碼實現上面的功能; ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073420708-744335415.png) ### CoProcessFunction的子類 1. 前面的兩篇實戰中,CoProcessFunction的子類都寫成了匿名類,如下圖紅框: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201207073420994-231561697.png) 2. 本文中,CoProcessFunction子類會用到外部類的成員變數,因此不能再用匿名類了,新增CoProcessFunction的子類ExecuteWithTimeoutCoProcessFunction.java,稍後會說明幾個關鍵點: ```java package com.bolingcavalry.coprocessfunction; import com.bolingcavalry.Utils; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 實現雙流業務邏輯的功能類 */ public class ExecuteWithTimeoutCoProcessFunction extends CoProcess