spark的checkpoint
阿新 • • 發佈:2018-10-31
checkpoint的機制保證了需要訪問重複資料的應用Spark的DAG執行行圖可能很龐大,任務中計算鏈可能會很長,這時如果任務中途執行出錯,那麼任務的整個需要重算非常耗時,因此,有必要將計算代價較大的RDD checkpoint一下,當下遊RDD計算出錯時,可以直接從checkpoint過的RDD那裡讀取資料繼續算。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object CheckPointTest { def main(args: Array[String]) { val sc: SparkContext = SparkContext.getOrCreate(new SparkConf().setAppName("ck").setMaster("local[2]")) sc.setCheckpointDir("/Users/kinge/ck") val rdd: RDD[(String, Int)] = sc.textFile("").map{x=>(x,1) }.reduceByKey(_+_) rdd.checkpoint() rdd.count() rdd.groupBy(x=>x._2).collect().foreach(println) } }
作者:那年的壞人
連結:https://www.jianshu.com/p/653ebabc8f87
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。