spark ALS 使用checkpoint 機制
阿新 • • 發佈:2019-02-18
最近在開發協同過濾元件,運用了spark millb上的協同過濾 ALS演算法。在測試過程中遇到了記憶體溢位的錯誤,查找了錯誤位置,發現在訓練的時候,迭代次數的增加便會出現這個問題,原因可能是迭代是的計算的資料量指數上升。為了解決這個問題,參考了許多技術文件後總結出一些自己的見解。
1)在ALS模型中運用checkpoint機制
spark checkpoint 機制 個人理解就是在程式中插入一個檢查點,如果程式出錯,重新執行程式便會從這個檢查點開始,不需要重頭再來,大大加強了程式的效率。
在使用checkpoint前 需要先設定checkpoint和相關引數,再進行訓練。相關程式碼如下
sc.setCheckpointDir( "hdfs://master:9000/..")//會在..目錄建立一個資料夾 這裡的sc 是 sparkContext
ALS.setCheckpointInterval(2).setMaxIter(100).setRank(10).setAlpha(0.1)
val model = ALS.run(ratings)
這樣就在ALS 模型中運用了checkpoint 機制。
2)刪除checkpoint留下的過程資料
但是這樣會有一個弊端,每執行一次訓練都會在hdfs上儲存訓練的過程資料,佔用叢集儲存空間。解決的方法是利用spark操作(刪除)hdfs上的檔案。
sparkConf.set("spark.hadoop.validateOutputSpecs","false") //設定你spark的配置
val sc = SparkContext(sparkConf) //讀取你的saprkconf 到 sparkContext
通過spark自帶的hadoopconf方式刪除
val path = new Path("hdfs://xxx"); 宣告要操作(刪除)的hdfs 檔案路徑
val hadoopConf = sparkContext.hadoopConfiguration
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
if(hdfs.exists(path)){
//需要遞迴刪除設定true,不需要則設定false
hdfs.delete(path,true) //這裡因為是過程資料,可以遞迴刪除
}
總結,checkpoint處理spark mllib記憶體溢位的方法只能治標,不能治本。只是將記憶體中的資料固化到硬碟上,解決記憶體不足的問題。雖然解決了記憶體溢位,但如果資料量大的話估計還會出現一系列類似IO的問題。然而還沒有找到更好的解決方法,也就先這樣吧。學無止境,估計以後會有大神分享更好的解決方法。