1. 程式人生 > >CoProcessFunction實戰三部曲之一:基本功能

CoProcessFunction實戰三部曲之一:基本功能

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 關於《CoProcessFunction實戰三部曲》系列 - 《CoProcessFunction實戰三部曲》旨在通過三次實戰,由淺入深的學習和掌握Flink低階處理函式CoProcessFunction的用法; - 整個系列的開篇先介紹CoProcessFunction,然後迅速進入實戰,瞭解CoProcessFunction的基本功能; - 下一篇會結合狀態,讓雙流元素的處理彼此保持關係; - 終篇的實戰會加入定時器功能,確保同一個key的資料在雙流場景下能夠及時處理; ### 版本資訊 1. 開發環境作業系統:MacBook Pro 13寸, macOS Catalina 10.15.3 2. 開發工具:IDEA ULTIMATE 2018.3 3. JDK:1.8.0_211 4. Maven:3.6.0 5. Flink:1.9.2 ### 系列文章連結 1. [基本功能](https://xinchen.blog.csdn.net/article/details/109624375) 2. [狀態處理](https://xinchen.blog.csdn.net/article/details/109629119) 3. [定時器和側輸出](https://xinchen.blog.csdn.net/article/details/109645214) ### 關於CoProcessFunction - CoProcessFunction的作用是同時處理兩個資料來源的資料; - 試想在面對兩個輸入流時,如果這兩個流的資料之間有業務關係,該如何編碼實現呢,例如下圖中的操作,同時監聽9998和9999埠,將收到的輸出分別處理後,再由同一個sink處理(列印): ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075837289-720933326.png) - Flink支援的方式是擴充套件CoProcessFunction來處理,為了更清楚認識,我們把KeyedProcessFunction和CoProcessFunction的類圖擺在一起看,如下所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075837678-1470633056.png) - 從上圖可見,CoProcessFunction和KeyedProcessFunction的繼承關係一樣,另外CoProcessFunction自身也很簡單,在processElement1和processElement2中分別處理兩個上游流入的資料即可,並且也支援定時器設定; ### 本篇實戰功能簡介 本篇咱們要開發的應用,其功能非常簡單,描述如下: 1. 建兩個資料來源,資料分別來自本地9998和9999埠; 2. 每個埠收到類似aaa,123這樣的資料,轉成Tuple2例項,f0是aaa,f1是123; 3. 在CoProcessFunction的實現類中,對每個資料來源的資料都打日誌,然後全部傳到下游運算元; 4. 下游操作是列印,因此9998和9999埠收到的所有資料都會在控制檯打印出來; 5. 整個demo的功能如下圖所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075837972-465920736.png) - 接下來開始編碼; ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075838383-357010033.png) ### 程式碼簡介 1. 開發一個Map運算元,將字串轉成Tuple2; 2. 再開發抽象類AbstractCoProcessFunctionExecutor,功能包括:flink啟動、監聽埠、呼叫運算元處理資料、雙流連線、將雙流處理結果打印出來; 3. 從上面的描述可見,AbstractCoProcessFunctionExecutor做了很多事情,唯獨沒有實現雙流連線後的具體業務邏輯,這些沒有做的是留給子類來實現的,整個三部曲系列的重點都集中在AbstractCoProcessFunctionExecutor的子類上,把雙流連線後的業務邏輯做好,如下圖所示,紅色為CoProcessFunction的業務程式碼,其他的都在抽象類中完成: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075838646-1963831744.png) ### Map運算元 1. 做一個map運算元,用來將字串aaa,123轉成Tuple2例項,f0是aaa,f1是123; 2. 運算元名為WordCountMap.java: ```java package com.bolingcavalry.coprocessfunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.StringUtils; public class WordCountMap implements Map