Flink增量快照
1 概述
Apache Flink是可以進行有狀態的流處理,然而,在流處理中什麼是狀態呢?狀態是有過去事件的在記憶體中的一些操作需要儲存,這些儲存的資訊會影響未來事件的處理。
狀態是基礎,可以在流出中處理很多比較複雜的場景,如下:
- 當應用需要搜尋一種特定的時間模式,那麼就需要儲存資料的次序狀態;
- 當聚合每分鐘的時間,狀態需要儲存未完成的聚合資訊;
- 當線上訓練一個機器學習模型是,需要當前模型版本的引數。
然而,有狀態的流處理只有在狀態可以進行容錯的時候才可用於生產環境, 容錯 意味著即使出現軟體或者硬體故障,計算結果也要保證準確,沒有出現數據丟失或者重複計算等情況。
在flink中通過chekpointing來實現容錯,checkpoint是一個全域性的,提供非同步快照機制,定期的對當前應用進行快照並存儲到可靠儲存上,當出現異常時,flink重啟應用,並使用最近完成的checkpoint作為起點。一些使用者實際可能儲存的狀態很大,佔用上GB空間,這種情況下checkpoint的建立會非常慢,而且執行時佔用的資源也比較多,從而提出 incremental checkpointing
,即增量方式。
在增量方式之前,每次都是進行全量的checkpoint,但是每次快照都是基於上次的更新,不會很大,所以使用增量方式只要保持上一次與當前的差距即可。
2 示例
當前,可以使用RocksDB來作為增量checkpoint的儲存,並在其中不是持續增大,可以進行定期合併清楚歷史狀態。

increment-checkpoint-example.png
該例子中,子任務的操作是一個keyed-state,一個checkpoint檔案儲存週期是可配置的,本例中是2,配置方式 state.checkpoints.num-retained
,上面展示了每次checkpoint時RocksDB示例中儲存的狀態以及檔案引用關係等。
- 對於checkpoint CP1,本地RocksDB目錄包含兩個磁碟檔案(sstable),它基於checkpoint的name來建立目錄。當完成checkpoint,將在共享登錄檔(shared state registry)中建立兩個實體並將其count置為1.在共享登錄檔中儲存的Key是由操作、子任務以及原始儲存名稱組成,同時登錄檔維護了一個Key到實際檔案儲存路徑的Map。
- 對於checkpoint CP2,RocksDB已經建立了兩個新的sstable檔案,老的兩個檔案也存在。在CP2階段,新的兩個生成新檔案,老的兩個引用原來的儲存。當checkpoint結束,所有引用檔案的count加1。
- 對於checkpoint CP3,RocksDB的compaction將sstable-(1),sstable-(2)以及sstable-(3)合併為sstable-(1,2,3),同時刪除了原始檔案。合併後的檔案包含原始檔案的所有信息,並刪除了重複的實體。除了該合併檔案,sstable-(4)還存在,同時有一個sstable-(5)創建出來。Flink將新的sstable-(1,2,3)和sstable-(5)儲存到底層,sstable-(4)引用CP2中的,並對相應引用次數count加1.老的CP1的checkpoint現在可以被刪除,由於其retained已達到2,作為刪除的一部分,Flink將所有CP1中的引用檔案count減1.
- 對於checkpoint CP4,RocksDB合併sstable-(4)、sstable-(5)以及新的sstable-(6)成sstable-(4,5,6)。Flink將該新的sstable儲存,並引用sstable-(1,2,3),並將sstable-(1,2,3)的count加1,刪除CP2中retained到2的。由於sstable-(1), sstable-(2), 和sstable-(3)降到了0,Flink將其從底層刪除。