【Spark Mllib】效能評估 ——MSE/RMSE與MAPK/MAP
推薦模型評估
MSE/RMSE
均方差(MSE),就是對各個實際存在評分的項,pow(預測評分-實際評分,2)的值進行累加,在除以項數。而均方根差(RMSE)就是MSE開根號。
我們先用ratings生成(user,product)RDD,作為model.predict()的引數,從而生成以(user,product)為key,value為預測的rating的RDD。然後,用ratings生成以(user,product)為key,實際rating為value的RDD,並join上前者:
val usersProducts = ratings.map{ case Rating(user, product, rating) => (user, product)}
val predictions = model.predict(usersProducts).map{
case Rating(user, product, rating) => ((user, product), rating)
}
val ratingsAndPredictions = ratings.map{
case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
ratingsAndPredictions.first()
//res21: ((Int, Int), (Double, Double)) = ((291,800),(2.0,2.052364223387371))
使用MLLib的評估函式,我們要傳入一個(actual,predicted)的RDD。actual和predicted左右位置可以交換:
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
// Mean Squared Error = 0.08231947642632852
// Root Mean Squared Error = 0.2869137090247319
MAPK/MAP
K值平均準確率(MAPK)可以簡單的這麼理解:
設定推薦K=10,即推薦10個物品。預測該使用者評分最高的10個物品ID作為文字1,實際上使用者評分過所有物品ID作為文字2,求二者的相關度。(個人認為該評估方法在這裡不是很適用)
我們可以按評分排序預測物品ID,再從頭遍歷,如果該預測ID出現在實際評分過ID的集合中,那麼就增加一定分數(當然,排名高的應該比排名低的增加更多的分數,因為前者更能體現推薦的準確性)。最後將累加得到的分數除以min(K,actual.size)
如果是針對所有使用者,我們需要把各個使用者的累加分數進行累加,在除以使用者數。
在MLlib裡面,使用的是全域性平均準確率(MAP,不設定K)。它需要我們傳入(predicted.Array,actual.Array)的RDD。
現在,我們先來生成predicted:
我們先生成產品矩陣:
/* Compute recommendations for all users */
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
// (1682,50)
以便工作節點能夠訪問到,我們把該矩陣以廣播變數的形式分發出去:
// broadcast the item factor matrix
val imBroadcast = sc.broadcast(itemMatrix)
矩陣相乘,計算出評分。scores.data.zipWithIndex,scores.data再按評分排序。生成recommendedIds,構建(userId, recommendedIds)RDD:
val allRecs = model.userFeatures.map{ case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
(userId, recommendedIds)
}
提取實際值:
// next get all the movie ids per user, grouped by user id
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
// userMovies: org.apache.spark.rdd.RDD[(Int, Seq[(Int, Int)])] = MapPartitionsRDD[277] at groupBy at <console>:21
生成(predicted.Array,actual.Array)的RDD,並使用評估函式:
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
// Mean Average Precision = 0.07171412913757183