1. 程式人生 > >【總結】論spark中的cache/persist/checkpoint

【總結】論spark中的cache/persist/checkpoint

1. cache與persist

cache 能夠讓重複資料在同一個 application 中的 jobs 間共享。RDD的cache()方法其實呼叫的就是persist方法,快取策略均為MEMORY_ONLY。

下面簡單引入一下cache的機制:

  • 哪些 RDD 需要 cache?

    會被重複使用的(但不能太大)。

  • 使用者怎麼設定哪些 RDD 要 cache?

    因為使用者只與 driver program 打交道,因此只能用 rdd.cache() 去 cache 使用者能看到的 RDD。所謂能看到指的是呼叫 transformation() 後生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被使用者直接 cache 的,比如 reduceByKey() 中會生成的 ShuffledRDD、MapPartitionsRDD 是不能被使用者直接 cache 的。

  • driver program 設定 rdd.cache() 後,系統怎麼對 RDD 進行 cache?

    先不看實現,自己來想象一下如何完成 cache:當 task 計算得到 RDD 的某個 partition 的第一個 record 後,就去判斷該 RDD 是否要被 cache,如果要被 cache 的話,將這個 record 及後續計算的到的 records 直接丟給本地 blockManager 的 memoryStore,如果 memoryStore 存不下就交給 diskStore 存放到磁碟。
    實際實現與設想的基本類似,區別在於:將要計算 RDD partition 的時候(而不是已經計算得到第一個 record 的時候)就去判斷 partition 要不要被 cache。如果要被 cache 的話,先將 partition 計算出來,然後 cache 到記憶體。cache 只使用 memory,寫磁碟的話那就叫 checkpoint 了。
    呼叫 rdd.cache() 後, rdd 就變成 persistRDD 了,其 StorageLevel 為 MEMORY_ONLY。persistRDD 會告知 driver 說自己是需要被 persist 的。

  • cached RDD 怎麼被讀取

    下次計算(一般是同一 application 的下一個 job 計算)時如果用到 cached RDD,task 會直接去 blockManager 的 memoryStore 中讀取。具體地講,當要計算某個 rdd 中的 partition 時候(通過呼叫 rdd.iterator())會先去 blockManager 裡面查詢是否已經被 cache 了,如果 partition 被 cache 在本地,就直接使用 blockManager.getLocal() 去本地 memoryStore 裡讀取。如果該 partition 被其他節點上 blockManager cache 了,會通過 blockManager.getRemote() 去其他節點上讀取。

cache與persist的唯一區別在於: cache只有一個預設的快取級別MEMORY_ONLY ,而persist可以根據StorageLevel設定其它的快取級別。這裡注意一點cache或者persist並不是action

2. cache與checkpoint

關於這個問題,Tathagata Das 有一段回答: There is a significant difference between cache and checkpoint.Cache materializes the RDD and keeps it in memory and/or disk(其實只有 memory). But the lineage(也就是 computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows longlineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).
即cache 和 checkpoint 的顯著區別是:cache把 RDD 計算出來然後放在記憶體中, 但是RDD 的依賴鏈也不能丟掉, 當某個點某個 executor 宕了, 上面cache 的RDD就會丟掉, 需要通過依賴鏈重新計算出來;而 checkpoint 是把 RDD 儲存在 HDFS中, 是多副本可靠儲存,所以依賴鏈就可以丟掉了,就斬斷了依賴鏈,因為checkpoint是需要把 job 重新從頭算一遍, 最好先cache一下, checkpoint就可以直接儲存快取中的 RDD 了, 就不需要重頭計算一遍了, 對效能有極大的提升。

這裡值得注意的是:cache 機制是每計算出一個要 cache 的 partition 就直接將其 cache 到記憶體了。但 checkpoint 沒有使用這種第一次計算得到就儲存的方法,而是等到 job 結束後另外啟動專門的 job 去完成 checkpoint 。也就是說需要 checkpoint 的 RDD 會被計算兩次。因此,在使用 rdd.checkpoint() 的時候,建議加上 rdd.cache(),這樣第二次執行的 job 就不用再去計算該 rdd 了,直接讀取 cache 寫磁碟。

3. persist與checkpoint

rdd.persist(StorageLevel.DISK_ONLY) 與 checkpoint 區別的是:前者雖然可以將 RDD 的 partition 持久化到磁碟,但該 partition 由 blockManager 管理。一旦 driver program 執行結束,也就是 executor 所在程序 CoarseGrainedExecutorBackend stop,blockManager 也會 stop,被 cache 到磁碟上的 RDD 也會被清空(整個 blockManager 使用的 local 資料夾被刪除)。而 checkpoint 將 RDD 持久化到 HDFS 或本地資料夾,如果不被手動 remove 掉( 話說怎麼 remove checkpoint 過的 RDD? ),是一直存在的,也就是說可以被下一個 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

總結

Spark相比Hadoop的優勢在於儘量不去持久化,所以使用 pipeline,cache 等機制。使用者如果感覺 job 可能會出錯可以手動去 checkpoint 一些 critical 的 RDD,job 如果出錯,下次執行時直接從 checkpoint 中讀取資料。唯一不足的是,checkpoint 需要兩次執行 job。