1. 程式人生 > >【Spark Mllib】效能評估 ——MSE/RMSE與MAPK/MAP

【Spark Mllib】效能評估 ——MSE/RMSE與MAPK/MAP

推薦模型評估

MSE/RMSE

均方差(MSE),就是對各個實際存在評分的項,pow(預測評分-實際評分,2)的值進行累加,在除以項數。而均方根差(RMSE)就是MSE開根號。

我們先用ratings生成(user,product)RDD,作為model.predict()的引數,從而生成以(user,product)為key,value為預測的rating的RDD。然後,用ratings生成以(user,product)為key,實際rating為value的RDD,並join上前者:

val usersProducts = ratings.map{ case Rating(user, product, rating)  => (user, product)}
val
predictions = model.predict(usersProducts).map{ case Rating(user, product, rating) => ((user, product), rating) } val ratingsAndPredictions = ratings.map{ case Rating(user, product, rating) => ((user, product), rating) }.join(predictions) ratingsAndPredictions.first() //res21: ((Int, Int), (Double, Double)) = ((291,800),(2.0,2.052364223387371))

使用MLLib的評估函式,我們要傳入一個(actual,predicted)的RDD。actual和predicted左右位置可以交換:

import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = "
+ regressionMetrics.meanSquaredError) println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError) // Mean Squared Error = 0.08231947642632852 // Root Mean Squared Error = 0.2869137090247319

MAPK/MAP

K值平均準確率(MAPK)可以簡單的這麼理解:
設定推薦K=10,即推薦10個物品。預測該使用者評分最高的10個物品ID作為文字1,實際上使用者評分過所有物品ID作為文字2,求二者的相關度。(個人認為該評估方法在這裡不是很適用)
我們可以按評分排序預測物品ID,再從頭遍歷,如果該預測ID出現在實際評分過ID的集合中,那麼就增加一定分數(當然,排名高的應該比排名低的增加更多的分數,因為前者更能體現推薦的準確性)。最後將累加得到的分數除以min(K,actual.size)
如果是針對所有使用者,我們需要把各個使用者的累加分數進行累加,在除以使用者數。
在MLlib裡面,使用的是全域性平均準確率(MAP,不設定K)。它需要我們傳入(predicted.Array,actual.Array)的RDD
現在,我們先來生成predicted:
我們先生成產品矩陣:

/* Compute recommendations for all users */
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
// (1682,50)

以便工作節點能夠訪問到,我們把該矩陣以廣播變數的形式分發出去:

// broadcast the item factor matrix
val imBroadcast = sc.broadcast(itemMatrix)

矩陣相乘,計算出評分。scores.data.zipWithIndex,scores.data再按評分排序。生成recommendedIds,構建(userId, recommendedIds)RDD:

val allRecs = model.userFeatures.map{ case (userId, array) => 
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}

提取實際值:

// next get all the movie ids per user, grouped by user id
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
// userMovies: org.apache.spark.rdd.RDD[(Int, Seq[(Int, Int)])] = MapPartitionsRDD[277] at groupBy at <console>:21

生成(predicted.Array,actual.Array)的RDD,並使用評估函式:

import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
// Mean Average Precision = 0.07171412913757183