Checkpoint的運行原理和源碼實現
引言
Checkpoint 到底是什麽和需要用 Checkpoint 解決什麽問題:
- Spark 在生產環境下經常會面臨 Transformation 的 RDD 非常多(例如一個Job 中包含1萬個RDD) 或者是具體的 Transformation 產生的 RDD 本身計算特別復雜和耗時(例如計算時常超過1個小時) , 可能業務比較復雜,此時我們必需考慮對計算結果的持久化。
- Spark 是擅長多步驟叠代,同時擅長基於 Job 的復用。這個時候如果可以對計算的過程進行復用,就可以極大的提升效率。因為有時候有共同的步驟,就可以免卻重復計算的時間。
- 如果采用 persists 把數據在內存中的話,雖然最快速但是也是最不可靠的;如果放在磁盤上也不是完全可靠的,例如磁盤會損壞,系統管理員可能會清空磁盤。
- Checkpoint 的產生就是為了相對而言更加可靠的持久化數據,在 Checkpoint 可以指定把數據放在本地並且是多副本的方式,但是在正常生產環境下放在 HDFS 上,這就天然的借助HDFS 高可靠的特征來完成最大化的可靠的持久化數據的方式。
- Checkpoint 是為了最大程度保證絕對可靠的復用 RDD 計算數據的 Spark 的高級功能,通過 Checkpoint 我們通過把數據持久化到 HDFS 上來保證數據的最大程度的安任性
- Checkpoint 就是針對整個RDD 計算鏈條中特別需要數據持久化的環節(後面會反覆使用當前環節的RDD) 開始基於HDFS 等的數據持久化復用策略,通過對 RDD 啟動 Checkpoint 機制來實現容錯和高可用
Checkpoint 運行原理圖
Checkpoint 源碼解析
1、RDD.iterator 方法,它會先在緩存中查看數據 (內部會查看 Checkpoint 有沒有相關數據),然後再從 CheckPoint 中查看數據
Checkpoint 有兩種方法,一種是 reliably 和 一種是 locally
[下圖是 RDD.scala 中的 isCheckpointed 變量和 isCheckpointedAndMaterialized 方法]
2、通過調用 SparkContext.setCheckpointDir 方法來指定進行 Checkpoint 操作的 RDD 把數據放在那裏,在生產集群中是放在 HDFS 上的,同時為了提高效率在進行 Checkpoint 的時候可以指定很多目錄
3、在進行 RDD 的 Checkpoint 的時候,其所依賴的所有 RDD 都會清空掉;官方建議如果要進行 checkpoint 時,必需先緩存在內存中。但實際可以考慮緩存在本地磁盤上或者是第三方組件,e.g. Taychon 上。在進行 checkpoint 之前需要通過 SparkConetxt 設置 checkpoint 的文件夾
[下圖是 RDD.scala 中的 checkpoint 方法]
4、作為最佳實踐,一般在進行 checkpoint 方法調用前都要進行 persists 來把當前 RDD 的數據持久化到內存或者是磁盤上,這是因為 checkpoint 是 lazy 級別,必需有 Job 的執行且在Job 執行完成後才會從後往前回溯哪個 RDD 進行了Checkpoint 標記,然後對該標記了要進行 Checkpoint 的 RDD 新啟動一個Job 執行具體 Checkpoint 的過程;
5、Checkpoint 改變了 RDD 的 Lineage
6、當我們調用了checkpoint 方法要對RDD 進行Checkpoint 操作的話,此時框架會自動生成 RDDCheckpointData
7、當 RDD 上運行一個Job 後就會立即觸發 RDDCheckpointData 中的 checkpoint 方法,在其內部會調用 doCheckpoint( )方法,實際上在生產環境上會調用 ReliableRDDCheckpointData 的 doCheckpoint( )方法
8、在生產環境下會導致 ReliableRDDCheckpointData 的 writeRDDToCheckpointDirectory 的調用,而在 writeRDDToCheckpointDirectory 方法內部會觸發runJob 來執行當前的RDD 中的數據寫到Checkpoint 的目錄中,同時會產生ReliableCheckpointRDD 實例
Checkpoint的運行原理和源碼實現