1. 程式人生 > >SparkML之推薦引擎(二)—— 推薦模型評估

SparkML之推薦引擎(二)—— 推薦模型評估

本文內容和程式碼是接著上篇文章來寫的,推薦先看一下哈~
我們上一篇文章是寫了電影推薦的實現,但是推薦內容是否合理呢,這就需要我們對模型進行評估
針對推薦模型,這裡根據 均方差K值平均準確率 來對模型進行評估,MLlib也對這幾種評估方法都有提供內建的函式

在真實情況下,是要不斷地對推薦模型的三個關鍵引數 rank、iterations、lambda 分別選取不同的值,然後對不同引數生成的模型進行評估,從而選取出最好的模型。

下面就對兩種推薦模型評估的方法進行說明~


1、均方差(MSE) 和 均方根誤差(RMSE)

定義:各平方誤差的和與總數目的商。其實可以理解為 預測到的評級 與 真實評級的差值 的平方。
均方根誤差的使用也很普遍,其計算只需在MSE上取平方根即可~

評估程式碼為:

//格式:(userID,電影)
val userProducts: RDD[(Int, Int)] = ratings.map(rating => (rating.user, rating.product))
//模型推測出的評分資訊,格式為:((userID,電影), 推測評分)
val predictions: RDD[((Int, Int), Double)] = model.predict(userProducts).map(rating => ((rating.user, rating.product),rating.rating))
//格式為:((userID,電影), (真實平評分,推測評分))
val ratingsAndPredictions: RDD[((Int, Int), (Double, Double))] = ratings.map
(rating => ((rating.user, rating.product), rating.rating)) .join(predictions) //均方差 val MSE = ratingsAndPredictions.map(rap => math.pow(rap._2._1 - rap._2._2, 2)).reduce(_+_) / ratingsAndPredictions.count() println("MSE:" + MSE) //均方根誤差 val RMSE: Double = math.sqrt
(MSE) println("RMSE:" + RMSE)

上面是我們自己算出來的,也可以用MLlib內建的函式來算:

import org.apache.spark.mllib.evaluation.{RegressionMetrics, RankingMetrics}
val predictedAndTrue: RDD[(Double, Double)] = ratingsAndPredictions.map{ case((userID, product),(actual, predict)) => (actual, predict)}
val regressionMetrics: RegressionMetrics = new RegressionMetrics(predictedAndTrue)
println("MSE:" + regressionMetrics.meanSquaredError)
println("RMSE:" + regressionMetrics.rootMeanSquaredError)

輸出為:

MSE:0.08231947642632852
RMSE:0.2869137090247319

2、K值平均準確率(MAPK)

K值平均準確率(MAPK)的意思是整個資料集上的K值平均準確率(APK)的均值。APK是資訊檢索中常用的一個指標。它用於衡量針對某個查詢所返回的“前K個”文件的平均相關性。
如果結果中文件的實際相關性越高且排名也更靠前,那APK分值也就越高。如果在預測結果中得分更高(在推薦列表中排名也更靠前)的物品實際上也與使用者更相關,那自然這個模型就更好。

ok,MAPK評估程式碼如下:

package ml

import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.mllib.recommendation.{Rating, ALS}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.jblas.DoubleMatrix
import sql.StreamingExamples
import scala.collection.Map

object MAPKTest{
  def main(args: Array[String]) {
    StreamingExamples.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("MAPKTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /*使用者 電影 評分*/
    val rawData: RDD[String] = sc.textFile("file:///E:/spark/ml-100k/u.data")
    //去掉時間的欄位,rawRatings:Array
    val rawRatings = rawData.map(_.split("\\t").take(3))
    //user moive rating
    val ratings = rawRatings.map{case Array(user, movie, rating) =>{
      Rating(user.toInt, movie.toInt, rating.toDouble)
    }}
    /**
      * 得到訓練的模型
      * 注意:50代表我們得到的模型的因子的列的數量,名稱叫 因子維數
      */
    val model = ALS.train(ratings, 50, 10, 0.01)

    /*獲取模型中所有商品的 factor,並轉換成矩陣*/
    val itemFactors: Array[Array[Double]] = model.productFeatures.map{case (id, factor) => factor}.collect()
    val itemMatrix: DoubleMatrix = new DoubleMatrix(itemFactors)
//    println(itemMatrix.rows, itemMatrix.columns)

    /*獲得模型中每個使用者對應的每個電影的評分*/
    val allRecs = model.userFeatures.map{ case(userId, factor) => {
      val userVector = new DoubleMatrix(factor)
      /**
        * socres是一個DoubleMatrix型別,值為1行N列的 Vector
        * 為什麼可以通過判斷這兩個矩陣的乘積的大小,從而來判斷分數呢?
        * 這歸根於ALS演算法,該演算法是將一個 使用者-商品 的矩陣 拆分成 使用者、商品兩個矩陣
        * 因此這兩個矩陣的乘積就是實際的 分數
        */
      val scores = itemMatrix.mmul(userVector)//矩陣和向量的乘積,求出每個使用者的分數
      //根據評分倒數排序
      val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
      //(score, itemId)
      val recommendIds = sortedWithId.map(_._2 + 1).toSeq
      //返回使用者 和 各個商品評分的倒數的值 的 tuple: (userId,(sorce, itemId))
      (userId, recommendIds)
    }}

    /*獲取實際中的 每個使用者對應的有評分過的電影的評分*/
    val userMoives: RDD[(Int, Iterable[(Int, Int)])] = ratings.map{ case Rating(user, product, rating) => {
      (user, product)
    }}.groupBy(_._1)

    val predictedAndTrueForRanking = allRecs.join(userMoives).map{ case( userId, (predicted, actualWithIds) ) => {
      //實際的商品編號
      val actual = actualWithIds.map(_._2)
      (actual.toArray, predicted.toArray)
    }}
    val rankingMetrics: RankingMetrics[Int] = new RankingMetrics(predictedAndTrueForRanking)
    println("使用內建的計算MAP:" + rankingMetrics.meanAveragePrecision)
  }
}

輸出結果為:

使用內建的計算MAP0.0630466936422453

3、推薦模型完整程式碼
package ml

import org.apache.spark.mllib.evaluation.{RegressionMetrics, RankingMetrics}
import org.apache.spark.mllib.recommendation.{Rating, ALS}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.jblas.DoubleMatrix
import sql.StreamingExamples
import scala.collection.Map

/**
  * 基於Spark MLlib 的推薦演算法
  * ALS:最小二乘法
  *
  * @author lwj
  * @date 2018/05/04
  */
object Recommend{
  /**
    * 用於商品推薦
    * 通過傳入兩個向量,返回這兩個向量之間的餘弦相似度
    *
    * @param vec1
    * @param vec2
    * @return
    */
  def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
  }

  /**
    * 模型評估
    * K值平均準確率(APK)
    *
    * @param actual
    * @param predicted
    * @param k
    * @return
    */
  def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int) : Double = {
    val predK: Seq[Int] = predicted.take(k)
    var score = 0.0
    var numHits = 0.0
    for ((p, i) <- predK.zipWithIndex){
      if (actual.contains(p)){
        numHits += 1.0
        score += numHits / (i.toDouble + 1.0) //TODO 為什麼除以i.toDouble
      }
    }
    if (actual.isEmpty){
      1.0
    }else{
      score / math.min(actual.size, k).toDouble //TODO 為什麼是min
    }
  }


  def main(args: Array[String]) {
    StreamingExamples.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("recommandTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /*使用者 電影 評分*/
    val rawData: RDD[String] = sc.textFile("file:///E:/spark/ml-100k/u.data")
    //去掉時間的欄位,rawRatings:Array
    val rawRatings = rawData.map(_.split("\\t").take(3))
    //user moive rating
    val ratings = rawRatings.map{case Array(user, movie, rating) =>{
      Rating(user.toInt, movie.toInt, rating.toDouble)
    }}
    //電影
    val movies: RDD[String] = sc.textFile("file:///E:/spark/ml-100k/u.item")
    //電影ID 電影名
    val titles: Map[Int, String] = movies.map(_.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
    /**
      * 得到訓練的模型
      * 注意:50代表我們得到的模型的因子的列的數量,名稱叫 因子維數
      */
    val model = ALS.train(ratings, 50, 10, 0.01)

    /**
      * 基於使用者進行推薦
      */
    //使用者因子的數量
    //  println(mode.userFeatures.count())
    //商品因子的數量
    //  println(mode.productFeatures.count())
    //檢視某個使用者對某個商品的預測評分,ALS模型的初始化是隨機的,所以產生的結果可能會不同
    //  println(mode.predict(789, 123))

    //為指定的使用者推薦 N 個商品
    val userID = 789
    val K = 10
    val topKRecs: Array[Rating] = model.recommendProducts(userID, 10)
    //  println(topKRecs.mkString("\n"))

    //獲取指定使用者所評價過的電影
    val moviesForUser: Seq[Rating] = ratings.keyBy(_.user).lookup(789)

    //打印出指定使用者評價最高的10部電影的名稱和評分
    println("真實的:")
    moviesForUser.sortBy(-_.rating).take(10).map(rating => {
      (titles(rating.product),rating.rating)
    }).foreach(println)

    //打印出推薦給使用者的10部電影的名稱和評分,和上面的進行比較
    println("推薦的:")
    topKRecs.map(rating => {
      (titles(rating.product),rating.rating)
    }).foreach(println)


    println("\n-----------------------\n")

    /**
      * 基於商品進行推薦
      */
    /*通過商品ID獲得與該商品相似的商品*/
    val itemId = 567
    val itemFactor: Array[Double] = model.productFeatures.lookup(itemId).head
    val itemVector: DoubleMatrix = new DoubleMatrix(itemFactor)
    //獲得每個商品與給出的商品的餘弦相似度
    val sims = model.productFeatures.map{case (id, factor) => {
      val factorVector = new DoubleMatrix(factor)
      val sim = cosineSimilarity(factorVector, itemVector)
      (id, sim)
    }}
    //打印出前N的商品
    val topItem: Array[(Int, Double)] = sims.sortBy(-_._2).take(10 + 1)
    println("與567商品相似的商品:\n" + topItem.mkString("\n") + "\n")

    /*校驗商品*/
    println("給定的商品名稱為: " + titles(itemId))
    println("相似的商品名稱為:")
    topItem.slice(1, 11).foreach(item => println(titles(item._1)))


    println("\n-----------------------\n")

    /*模型評估*/
    /**
      * 均方差評估
      * 對model全量資料進行評估
      */
//    val actualRating: Rating = moviesForUser.take(1)(0)
//    val predictedRating: Double = model.predict(789, actualRating.product)
//    println("\n真實分:" + actualRating.rating + "  預測分:" + predictedRating)
    //格式:(userID,電影)
    val userProducts: RDD[(Int, Int)] = ratings.map(rating => (rating.user, rating.product))
    //模型推測出的評分資訊,格式為:((userID,電影), 推測評分)
    val predictions: RDD[((Int, Int), Double)] = model.predict(userProducts).map(rating => ((rating.user, rating.product),rating.rating))
    //格式為:((userID,電影), (真實平評分,推測評分))
    val ratingsAndPredictions: RDD[((Int, Int), (Double, Double))] = ratings.map(rating => ((rating.user, rating.product), rating.rating))
                                                                            .join(predictions)
    //均方差
    val MSE = ratingsAndPredictions.map(rap => math.pow(rap._2._1 - rap._2._2, 2)).reduce(_+_) / ratingsAndPredictions.count()
    println("均方差MSE為: " + MSE)
    //均方根誤差
    val RMSE: Double = math.sqrt(MSE)
    println("均方根誤差RMSE為: " + RMSE)

    /**
      * K值平均準確率評估
      * 注意:該評估模型是針對對使用者感興趣和回去接觸的物品的預測能力
      * 也是就是說:這時針對基於使用者推薦的 模型的評估
      */
    /*計算 單個 指定使用者推薦的APK指標*/
    val actualMovies: Seq[Int] = moviesForUser.map(_.product)
    val predictedMovies: Array[Int] = topKRecs.map(_.product)
    val apk10: Double = avgPrecisionK(actualMovies, predictedMovies, 10)
    println("789的APK值為:" + apk10)

    /*獲取模型中所有商品的 factor,並轉換成矩陣*/
    val itemFactors: Array[Array[Double]] = model.productFeatures.map{case (id, factor) => factor}.collect()
    val itemMatrix: DoubleMatrix = new DoubleMatrix(itemFactors)
//    println(itemMatrix.rows, itemMatrix.columns)

    /*獲得模型中每個使用者對應的每個電影的評分*/
    val allRecs = model.userFeatures.map{ case(userId, factor) => {
      val userVector = new DoubleMatrix(factor)
      /**
        * socres是一個DoubleMatrix型別,值為1行N列的 Vector
        * 為什麼可以通過判斷這兩個矩陣的乘積的大小,從而來判斷分數呢?
        * 這歸根於ALS演算法,該演算法是將一個 使用者-商品 的矩陣 拆分成 使用者、商品兩個矩陣
        * 因此這兩個矩陣的乘積就是實際的 分數
        */
      val scores = itemMatrix.mmul(userVector)//矩陣和向量的乘積,求出每個使用者的分數
      //根據評分倒數排序
      val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
      //(score, itemId)
      val recommendIds = sortedWithId.map(_._2 + 1).toSeq
      //返回使用者 和 各個商品評分的倒數的值 的 tuple: (userId,(sorce, itemId))
      (userId, recommendIds)
    }}

    /*獲取實際中的 每個使用者對應的有評分過的電影的評分*/
    val userMoives: RDD[(Int, Iterable[(Int, Int)])] = ratings.map{ case Rating(user, product, rating) => {
      (user, product)
    }}.groupBy(_._1)

    val MAPK = allRecs.join(userMoives).map{ case( userId, (predicted, actualWithIds) ) => {
      //實際的商品編號
      val actual = actualWithIds.map(_._2).toSeq
      avgPrecisionK(actual, predicted, 10)
    }}.reduce(_ + _) / allRecs.count

    println("MAPK:" + MAPK)


    println("\n-----------------------\n")

    /**
      * 使用MLlib內建的評估器
      */
    /*RMSE 和 MSE*/
    val predictedAndTrue: RDD[(Double, Double)] = ratingsAndPredictions.map{ case((userID, product),(actual, predict)) => (actual, predict)}
    val regressionMetrics: RegressionMetrics = new RegressionMetrics(predictedAndTrue)
    println("使用內建的計算MSE:" + regressionMetrics.meanSquaredError)
    println("使用內建的計算RMSE:" + regressionMetrics.rootMeanSquaredError)

    /*MAPK*/
    val predictedAndTrueForRanking = allRecs.join(userMoives).map{ case( userId, (predicted, actualWithIds) ) => {
      //實際的商品編號
      val actual = actualWithIds.map(_._2)
      (actual.toArray, predicted.toArray)
    }}
    val rankingMetrics: RankingMetrics[Int] = new RankingMetrics(predictedAndTrueForRanking)
    println("使用內建的計算MAP:" + rankingMetrics.meanAveragePrecision)


  }
}

本文參考自: 《Spark機器學習》和Spark官網 http://spark.apache.org/docs/1.6.3/mllib-guide.html