640?wx_fmt=png

前面,已經有一篇文章講解了spark的checkpoint:

同時,浪尖也在知識星球裡發了原始碼解析的文章。spark streaming的Checkpoint僅僅是針對driver的故障恢復做了資料和元資料的Checkpoint。而本文要講的flink的checkpoint機制要複雜了很多,它採用的是輕量級的分散式快照,實現了每個操作符的快照,及迴圈流的在迴圈的資料的快照。詳細的演算法後面浪尖會給出文章。

歡迎點選閱讀原文,加入浪尖知識星球,更深入學習spark等大資料知識。

640?wx_fmt=png640?wx_fmt=png1. 簡介

Apache Flink提供容錯機制,以持續恢復資料流應用程式的狀態。該機制確保即使存在故障,程式的每條記錄只會作用於狀態一次(exactly-once),當然也可以降級為至少一次(at-least-once)。

容錯機制持續地製作分散式流資料流的快照。對於狀態較小的流應用程式,這些快照非常輕量級,可以頻繁產生快照,而不會對效能產生太大影響。流應用程式的狀態儲存的位置是可以配置的(例如儲存在master節點或HDFS)。

如果程式失敗(由於機器,網路或軟體故障),Flink將停止分散式資料流。然後,系統重新啟動操作運算元並將其重置為最新的成功checkpoint。輸入流將重置為狀態快照記錄的位置。 作為重新啟動的並行資料流的一部分被處理的任何記錄都保證不會成為先前checkpoint狀態的一部分。

注意:預設情況下,禁用checkpoint。

注意:要使容錯機制完整,資料來源(如訊息佇列或者broker)要支援資料回滾到歷史記錄的位置。 Apache Kafka具有這種能力,Flink與Kafka的聯結器利用了該功能。

注意:由於Flink的checkpoint是通過分散式快照實現的,因此快照和checkpoint的概念可以互換使用。

640?wx_fmt=png2. Checkpointing

Flink的容錯機制的核心部分是製作分散式資料流和操作運算元狀態的一致性快照。 這些快照充當一致性checkpoint,系統可以在發生故障時回滾。 Flink用於製作這些快照的機制在“分散式資料流的輕量級非同步快照”中進行了描述。 它受到分散式快照的標準Chandy-Lamport演算法的啟發,專門針對Flink的執行模型而定製。

640?wx_fmt=png2.1 Barriers

Flink分散式快照的核心概念之一是barriers。 這些barriers被注入資料流並與記錄一起作為資料流的一部分向下流動。 barriers永遠不會超過記錄,資料流嚴格有序。 barriers將資料流中的記錄分為進入當前快照的記錄和進入下一個快照的記錄。每個barriers都帶有快照的ID,並且barriers之前的記錄都進入了該快照。 barriers不會中斷流的流動,非常輕量級。 來自不同快照的多個barriers可以同時在流中出現,這意味著可以同時發生各種快照。

640?wx_fmt=png

barriers在資料流源處被注入並行資料流中。快照n的barriers被插入的位置(我們稱之為Sn)是快照所包含的資料在資料來源中最大位置。例如,在Apache Kafka中,此位置將是分割槽中最後一條記錄的偏移量。 將該位置Sn報告給checkpoint協調器(Flink的JobManager)。

然後barriers向下遊流動。當一箇中間操作運算元從其所有輸入流中收到快照n的barriers時,它會為快照n發出barriers進入其所有輸出流中。 一旦sink操作運算元(流式DAG的末端)從其所有輸入流接收到barriers n,它就向checkpoint協調器確認快照n完成。在所有sink確認快照後,意味快照著已完成。

一旦完成快照n,job將永遠不再向資料來源請求Sn之前的記錄,因為此時這些記錄(及其後續記錄)將已經通過整個資料流拓撲,也即是已經被處理結束啦。

640?wx_fmt=png

接收多個輸入流的運算子需要基於快照barriers對齊輸入流。 上圖說明了這一點:

  • 一旦操作運算元從一個輸入流接收到快照barriers n,它就不能處理來自該流的任何記錄,直到它從其他輸入接收到barriers n為止。 否則,它會搞混屬於快照n的記錄和屬於快照n + 1的記錄。

  • barriers n所屬的流暫時會被擱置。 從這些流接收的記錄不會被處理,而是放入輸入緩衝區。

  • 一旦從最後一個流接收到barriers n,操作運算元就會發出所有掛起的向後傳送的記錄,然後自己發出快照n的barriers。

  • 之後,它恢復處理來自所有輸入流的記錄,在處理來自流的記錄之前優先處理來自輸入緩衝區的記錄。

640?wx_fmt=png2.2 state

當運算子包含任何形式的狀態時,此狀態也必須是快照的一部分。操作運算元狀態有不同的形式:

使用者定義的狀態:這是由轉換函式(如map()或filter())直接建立和修改的狀態。

系統狀態:此狀態是指作為運算子計算一部分的資料緩衝區。此狀態的典型示例是視窗緩衝區,系統在其中收集(和聚合)窗口裡的記錄,直到視窗被計算和拋棄。

操作運算元在他們從輸入流接收到所有快照barriers時,以及在向其輸出流發出barriers之前,會對其狀態進行寫快照。此時,在 barrier 之前的資料對狀態的更新已經完成,barrier 之後的資料不會更新狀態。 由於快照的狀態可能很大,因此它儲存在可配置的狀態後端中。預設情況下,是儲存到JobManager的記憶體,但對於生產使用,應配置分散式可靠儲存(例如HDFS)。 在儲存狀態之後,操作運算元確認checkpoint完成,將快照barriers傳送到輸出流中,然後繼續。

生成的快照現在包含:

  • 對於每個並行流資料來源,建立快照時流中的偏移/位置

  • 對於每個運算子,儲存在快照中的狀態指標

640?wx_fmt=png

640?wx_fmt=png2.3 Exactly Once vs. At Least Once

對齊步驟可能增加流式程式的等待時間。通常,這種額外的延遲大約為幾毫秒,但也會見到一些延遲顯著增加的情況。 對於要求所有記錄始終具有超低延遲(幾毫秒)的應用程式,Flink可以在checkpoint期間跳過流對齊。一旦操作運算元看到每個輸入流的checkpoint barriers,就會寫 checkpoint 快照。

當跳過對齊時,即使在 checkpoint n 的某些 checkpoint barriers 到達之後,操作運算元仍繼續處理所有輸入。這樣,操作運算元還可以在建立 checkpoint n 的狀態快照之前,繼續處理屬於checkpoint n + 1的資料。 在還原時,這些記錄將作為重複記錄出現,因為它們都包含在 checkpoint n 的狀態快照中,並將作為 checkpoint n 之後資料的一部分進行重複處理。

注意:對齊僅適用於具有多個輸入(join)的運算子以及具有多個輸出的運算子(在流重新分割槽/shuffle之後)。 正因為如此,對於只有map(),flatMap(),filter()等操作,實際上即使在至少一次模式下也能提供一次保證。

640?wx_fmt=png2.4 非同步狀態快照

注意,上述機制意味著操作運算元在將狀態的快照儲存在狀態後端時,停止處理輸入記錄。每次寫快照時,這種同步狀態快照操作都會引入延遲。

可以讓操作運算元在儲存狀態快照時繼續處理,高效地讓狀態快照儲存在後臺非同步發生。為此,操作運算元必須能夠生成一個狀態物件,該狀態物件應以某種方式儲存,以便對操作運算元狀態的進一步修改不會影響該狀態物件。 例如,RocksDB中使用的寫時複製(copy-on-write)資料結構具有這種能力。

在接收到輸入的checkpoint的barriers後,操作運算元啟動其狀態的非同步快照複製。它立即釋放其barriers到輸出,並繼續進行常規流處理。後臺複製過程完成後,它會向checkpoint協調器(JobManager)確認checkpoint完成。 checkpoint僅在所有sink都已收到barriers並且所有有狀態操作運算元已確認其完成備份(可能在barriers到達sink之後)之後才算完成。

640?wx_fmt=png2.5 Recovery

在這種機制下的恢復是很直接的:當失敗時,Flink選擇最新完成的checkpoint k。 然後,系統重新部署整個分散式資料流,併為每個操作運算元重置作為checkpoint k的一部分的快照的狀態。 資料來源設定為從位置Sk開始讀取。 例如在Apache Kafka中,這意味著告訴消費者從偏移量Sk開始讀取。

如果狀態以遞增方式寫快照,則操作運算元從最新完整快照的狀態開始,然後對該狀態應用一系列增量快照更新。

640?wx_fmt=png2.6 操作運算元快照的實現

在建立操作運算元快照時,有兩部分:同步部分和非同步部分。

操作運算元和狀態後端將其快照提供為Java FutureTask。 該任務包含同步部分已完成且非同步部分處於掛起狀態的狀態。 然後,非同步部分由該checkpoint的後臺執行緒執行。

完全同步的checkpoint返回已經完成的FutureTask的運算子。 如果需要執行非同步操作,則在FutureTask的run()方法中執行。

任務是可取消的,可以釋放流和其他資源消耗的控制代碼。

推薦閱讀:

640?wx_fmt=png