1. 程式人生 > >硬核!八張圖搞懂 Flink 端到端精準一次處理語義 Exactly-once(深入原理,建議收藏)

硬核!八張圖搞懂 Flink 端到端精準一次處理語義 Exactly-once(深入原理,建議收藏)

### Flink 在 Flink 中需要端到端精準一次處理的位置有三個: ![Flink 端到端精準一次處理](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_8.png) - **Source 端**:資料從上一階段進入到 Flink 時,需要保證訊息精準一次消費。 - **Flink 內部端**:這個我們已經瞭解,利用 Checkpoint 機制,把狀態存檔,發生故障的時候可以恢復,保證內部的狀態一致性。不瞭解的小夥伴可以看下我之前的文章: [Flink可靠性的基石-checkpoint機制詳細解析](https://mp.weixin.qq.com/s?__biz=Mzg2MzU2MDYzOA==&mid=2247483947&idx=1&sn=adae434f4e32b31be51627888e7d9f76&chksm=ce77f4faf9007decd2f78a788a89e6777bb7bec79f4e59093474532ca5cf774284e2fe35e1bd&token=1679639512&lang=zh_CN#rd) - **Sink 端**:將處理完的資料傳送到下一階段時,需要保證資料能夠準確無誤傳送到下一階段。 在 Flink 1.4 版本之前,精準一次處理只限於 Flink 應用內,也就是所有的 Operator 完全由 Flink 狀態儲存並管理的才能實現精確一次處理。但 Flink 處理完資料後大多需要將結果傳送到外部系統,比如 Sink 到 Kafka 中,這個過程中 Flink 並不保證精準一次處理。 在 Flink 1.4 版本正式引入了一個里程碑式的功能:兩階段提交 Sink,即 TwoPhaseCommitSinkFunction 函式。該 SinkFunction 提取並封裝**了兩階段提交協議**中的公共邏輯,自此 Flink 搭配特定 Source 和 Sink(如 Kafka 0.11 版)**實現精確一次處理語義**(英文簡稱:EOS,即 Exactly-Once Semantics)。 ### 端到端精準一次處理語義(EOS) **以下內容適用於 Flink 1.4 及之後版本** **對於 Source 端**:Source 端的精準一次處理比較簡單,畢竟資料是落到 Flink 中,所以 Flink 只需要儲存消費資料的偏移量即可, 如消費 Kafka 中的資料,Flink 將 Kafka Consumer 作為 Source,可以將偏移量儲存下來,如果後續任務出現了故障,恢復的時候可以由聯結器重置偏移量,重新消費資料,保證一致性。 **對於 Sink 端**:**Sink 端是最複雜的**,因為資料是落地到其他系統上的,資料一旦離開 Flink 之後,Flink 就監控不到這些資料了,所以精準一次處理語義必須也要應用於 Flink 寫入資料的外部系統,故這些外部系統必須提供一種手段允許提交或回滾這些寫入操作,同時還要保證與 Flink Checkpoint 能夠協調使用(Kafka 0.11 版本已經實現精確一次處理語義)。 我們以 Flink 與 Kafka 組合為例,Flink 從 Kafka 中讀資料,處理完的資料在寫入 Kafka 中。 為什麼以Kafka為例,第一個原因是目前大多數的 Flink 系統讀寫資料都是與 Kafka 系統進行的。第二個原因,也是**最重要的原因 Kafka 0.11 版本正式釋出了對於事務的支援,這是與Kafka互動的Flink應用要實現端到端精準一次語義的必要條件**。 當然,Flink 支援這種精準一次處理語義並不只是限於與 Kafka 的結合,可以使用任何 Source/Sink,只要它們提供了必要的協調機制。 ### Flink 與 Kafka 組合 ![Flink 應用示例](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_1.png) 如上圖所示,Flink 中包含以下元件: 1. 一個 Source,從 Kafka 中讀取資料(即 KafkaConsumer) 2. 一個時間視窗化的聚會操作(Window) 3. 一個 Sink,將結果寫入到 Kafka(即 KafkaProducer) **若要 Sink 支援精準一次處理語義(EOS),它必須以事務的方式寫資料到 Kafka**,這樣當提交事務時兩次 Checkpoint 間的所有寫入操作當作為一個事務被提交。這確保了出現故障或崩潰時這些寫入操作能夠被回滾。 當然了,**在一個分散式且含有多個併發執行 Sink 的應用中,僅僅執行單次提交或回滾是不夠的,因為所有元件都必須對這些提交或回滾達成共識,這樣才能保證得到一個一致性的結果。Flink 使用兩階段提交協議以及預提交(Pre-commit)階段來解決這個問題**。 ### 兩階段提交協議(2PC) **兩階段提交協議(Two-Phase Commit,2PC)是很常用的解決分散式事務問題的方式,它可以保證在分散式事務中,要麼所有參與程序都提交事務,要麼都取消,即實現 ACID 中的 A (原子性)**。 在資料一致性的環境下,其代表的含義是:要麼所有備份資料同時更改某個數值,要麼都不改,以此來達到資料的**強一致性**。 **兩階段提交協議中有兩個重要角色,協調者(Coordinator)和參與者(Participant),其中協調者只有一個,起到分散式事務的協調管理作用,參與者有多個**。 顧名思義,兩階段提交將提交過程劃分為連續的兩個階段:**表決階段(Voting)和提交階段(Commit)**。 兩階段提交協議過程如下圖所示: ![兩階段提交協議](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_2.png) **第一階段:表決階段** 1. 協調者向所有參與者傳送一個 VOTE_REQUEST 訊息。 2. 當參與者接收到 VOTE\_REQUEST 訊息,向協調者傳送 VOTE\_COMMIT 訊息作為迴應,告訴協調者自己已經做好準備提交準備,如果參與者沒有準備好或遇到其他故障,就返回一個 VOTE_ABORT 訊息,告訴協調者目前無法提交事務。 **第二階段:提交階段** 1. 協調者收集來自各個參與者的表決訊息。如果**所有參與者一致認為可以提交事務,那麼協調者決定事務的最終提交**,在此情形下協調者向所有參與者傳送一個 GLOBAL\_COMMIT 訊息,通知參與者進行本地提交;如果所有參與者中有**任意一個返回訊息是 VOTE_ABORT,協調者就會取消事務**,向所有參與者廣播一條 GLOBAL_ABORT 訊息通知所有的參與者取消事務。 2. 每個提交了表決資訊的參與者等候協調者返回訊息,如果參與者接收到一個 GLOBAL\_COMMIT 訊息,那麼參與者提交本地事務,否則如果接收到 GLOBAL_ABORT 訊息,則參與者取消本地事務。 ### 兩階段提交協議在 Flink 中的應用 **Flink 的兩階段提交思路**: 我們從 Flink 程式啟動到消費 Kafka 資料,最後到 Flink 將資料 Sink 到 Kafka 為止,來分析 Flink 的精準一次處理。 1. 當 Checkpoint 啟動時,JobManager 會將檢查點分界線(checkpoint battier)注入資料流,checkpoint barrier 會在運算元間傳遞下去,如下如所示: ![Flink 精準一次處理:Checkpoint 啟動](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_3.png) 2. **Source 端**:**Flink Kafka Source 負責儲存 Kafka 消費 offset**,當 Chckpoint 成功時 Flink 負責提交這些寫入,否則就終止取消掉它們,當 Chckpoint 完成位移儲存,它會將 checkpoint barrier(檢查點分界線) 傳給下一個 Operator,然後每個運算元會對當前的狀態做個快照,**儲存到狀態後端**(State Backend)。 **對於 Source 任務而言,就會把當前的 offset 作為狀態儲存起來。下次從 Checkpoint 恢復時,Source 任務可以重新提交偏移量,從上次儲存的位置開始重新消費資料**,如下圖所示: ![Flink 精準一次處理:checkpoint barrier 及 offset 儲存](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_4_1.png) 3. **Slink 端**:從 Source 端開始,每個內部的 transform 任務遇到 checkpoint barrier(檢查點分界線)時,都會把狀態存到 Checkpoint 裡。資料處理完畢到 Sink 端時,Sink 任務首先把資料寫入外部 Kafka,這些資料都屬於預提交的事務(還不能被消費),**此時的 Pre-commit 預提交階段下 Data Sink 在儲存狀態到狀態後端的同時還必須預提交它的外部事務**,如下圖所示: ![Flink 精準一次處理:預提交到外部系統](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_5_1.png) 4. **當所有運算元任務的快照完成**(所有建立的快照都被視為是 Checkpoint 的一部分),**也就是這次的 Checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 Checkpoint 完成,此時 Pre-commit 預提交階段才算完成**。才正式到**兩階段提交協議的第二個階段:commit 階段**。該階段中 JobManager 會為應用中每個 Operator 發起 Checkpoint 已完成的回撥邏輯。 本例中的 Data Source 和視窗操作無外部狀態,因此在該階段,這兩個 Opeartor 無需執行任何邏輯,但是 **Data Sink 是有外部狀態的,此時我們必須提交外部事務**,當 Sink 任務收到確認通知,就會正式提交之前的事務,Kafka 中未確認的資料就改為“已確認”,資料就真正可以被消費了,如下圖所示: ![Flink 精準一次處理:資料精準被消費](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_6_1.png) > 注:Flink 由 JobManager 協調各個 TaskManager 進行 Checkpoint 儲存,Checkpoint 儲存在 StateBackend(狀態後端) 中,預設 StateBackend 是記憶體級的,也可以改為檔案級的進行持久化儲存。 最後,一張圖總結下 Flink 的 EOS: ![Flink 端到端精準一次處理](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210130_7.png) **此圖建議儲存,總結全面且簡明扼要,再也不慫面試官!** > 文章首發於公眾號【五分鐘學大資料】,更多好文歡迎搜尋關注,公眾號回覆【面試】送你一份大資料面試