最近在開發協同過濾元件,運用了spark millb上的協同過濾 ALS演算法。在測試過程中遇到了記憶體溢位的錯誤,查找了錯誤位置,發現在訓練的時候,迭代次數的增加便會出現這個問題,原因可能是迭代是的計算的資料量指數上升。為了解決這個問題,參考了許多技術文件後總結出一些自己的見解。

1)在ALS模型中運用checkpoint機制

spark checkpoint 機制 個人理解就是在程式中插入一個檢查點,如果程式出錯,重新執行程式便會從這個檢查點開始,不需要重頭再來,大大加強了程式的效率。

在使用checkpoint前 需要先設定checkpoint和相關引數,再進行訓練。相關程式碼如下

sc.setCheckpointDir("hdfs://master:9000/..")//會在..目錄建立一個資料夾 這裡的sc 是 sparkContext
ALS.setCheckpointInterval(2).setMaxIter(100).setRank(10).setAlpha(0.1)
val model = ALS.run(ratings)

這樣就在ALS 模型中運用了checkpoint 機制。

2)刪除checkpoint留下的過程資料

但是這樣會有一個弊端,每執行一次訓練都會在hdfs上儲存訓練的過程資料,佔用叢集儲存空間。解決的方法是利用spark操作(刪除)hdfs上的檔案。

sparkConf.set("spark.hadoop.validateOutputSpecs","false")   //設定你spark的配置
val sc = SparkContext(sparkConf)   //讀取你的saprkconf 到 sparkContext
通過spark自帶的hadoopconf方式刪除
val path = new Path("hdfs://xxx"); 宣告要操作(刪除)的hdfs 檔案路徑
val hadoopConf = sparkContext.hadoopConfiguration
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
if(hdfs.exists(path)){
//需要遞迴刪除設定true,不需要則設定false
hdfs.delete(path,true)  //這裡因為是過程資料,可以遞迴刪除


總結,checkpoint處理spark mllib記憶體溢位的方法只能治標,不能治本。只是將記憶體中的資料固化到硬碟上,解決記憶體不足的問題。雖然解決了記憶體溢位,但如果資料量大的話估計還會出現一系列類似IO的問題。然而還沒有找到更好的解決方法,也就先這樣吧。學無止境,估計以後會有大神分享更好的解決方法。