1. 程式人生 > >Spark Mllib構建簡單的電影推薦系統(轉)

Spark Mllib構建簡單的電影推薦系統(轉)

基於模型的協同過濾應用–電影推薦
資料來源:資料
本文實現對使用者推薦電影的簡單應用。
1、測試資料描述
本次測試資料主要包括四個資料檔案:(詳細的資料描述參見README檔案)
1)使用者資料檔案
使用者ID::性別::年齡::職業編號::郵編
這裡寫圖片描述

2)電影資料檔案
電影ID::電影名稱::電影種類
這裡寫圖片描述
3)評分資料檔案
使用者ID::電影ID::評分::時間
這裡寫圖片描述
4)測試資料
使用者ID::電影ID::評分::時間
這裡寫圖片描述


這裡,前三個資料檔案用於模型訓練,第四個資料檔案用於測試模型。

    2、實現程式碼:
/**
  * Created by Administrator on 2017/4/6.
  */

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd._
import org.apache.spark.{SparkContext, SparkConf}


import scala.io
.Source object RecommandDemo1 { def main(args:Array[String]) { //遮蔽不必要的日誌顯示在終端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) //設定執行環境 val sparkConf = new SparkConf().setAppName("MovieLensALS"
).setMaster("local[3]") val sc = new SparkContext(sparkConf) //裝載使用者評分,該評分由評分器生成(即生成檔案personalRatings.txt) val myRatings = loadRatings(args(1)) val myRatingsRDD = sc.parallelize(myRatings, 1) //樣本資料目錄 val movielensHomeDir = args(0) //裝載樣本評分資料,其中最後一列Timestamp取除10的餘數作為key,Rating為值,即(Int,Rating) val ratings = sc.textFile(movielensHomeDir + "/ratings.dat").map { line => val fields = line.split("::") // format: (timestamp % 10, Rating(userId, movieId, rating)) (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } //裝載電影目錄對照表(電影ID->電影標題) val movies = sc.textFile(movielensHomeDir + "/movies.dat").map { line => val fields = line.split("::") // format: (movieId, movieName) (fields(0).toInt, fields(1)) }.collect().toMap //統計有使用者數量和電影數量以及使用者對電影的評分數目 val numRatings = ratings.count() val numUsers = ratings.map(_._2.user).distinct().count() val numMovies = ratings.map(_._2.product).distinct().count() println("Got " + numRatings + " ratings from " + numUsers + " users " + numMovies + " movies") //將樣本評分表以key值切分成3個部分,分別用於訓練 (60%,並加入使用者評分), 校驗 (20%), and 測試 (20%) //該資料在計算過程中要多次應用到,所以cache到記憶體 val numPartitions = 4 val training = ratings.filter(x => x._1 < 6).values.union(myRatingsRDD).repartition(numPartitions).persist() val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(numPartitions).persist() val test = ratings.filter(x => x._1 >= 8).values.persist() val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() println("Training: " + numTraining + " validation: " + numValidation + " test: " + numTest) //訓練不同引數下的模型,並在校驗集中驗證,獲取最佳引數下的模型 val ranks = List(8, 12) val lambdas = List(0.1, 10.0) val numIters = List(10, 20) var bestModel: Option[MatrixFactorizationModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = -1.0 var bestNumIter = -1 for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val model = ALS.train(training, rank, numIter, lambda) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE(validation) = " + validationRmse + " for the model trained with rank = " + rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } //用最佳模型預測測試集的評分,並計算和實際評分之間的均方根誤差(RMSE) val testRmse = computeRmse(bestModel.get, test, numTest) println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".") //create a naive baseline and compare it with the best model val meanRating = training.union(validation).map(_.rating).mean val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).reduce(_ + _) / numTest) val improvement = (baselineRmse - testRmse) / baselineRmse * 100 println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.") //推薦前十部最感興趣的電影,注意要剔除使用者已經評分的電影 val myRatedMovieIds = myRatings.map(_.product).toSet val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq) val recommendations = bestModel.get .predict(candidates.map((0, _))) .collect .sortBy(-_.rating) .take(10) var i = 1 println("Movies recommended for you:") recommendations.foreach { r => println("%2d".format(i) + ": " + movies(r.product)) i += 1 } sc.stop() } /** 校驗集預測資料和實際資料之間的均方根誤差 **/ def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double = { val predictions:RDD[Rating] = model.predict((data.map(x => (x.user,x.product)))) val predictionsAndRatings = predictions.map{ x =>((x.user,x.product),x.rating)} .join(data.map(x => ((x.user,x.product),x.rating))).values math.sqrt(predictionsAndRatings.map( x => (x._1 - x._2) * (x._1 - x._2)).reduce(_+_)/n) } /** 裝載使用者評分檔案 personalRatings.txt **/ def loadRatings(path:String):Seq[Rating] = { val lines = Source.fromFile(path).getLines() val ratings = lines.map{ line => val fields = line.split("::") Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble) }.filter(_.rating > 0.0) if(ratings.isEmpty){ sys.error("No ratings provided.") }else{ ratings.toSeq } } }
    3、執行程式
    1)設定引數,執行程式

這裡寫圖片描述
這裡有兩個輸入引數:第一個是資料檔案目錄,第二個是測試資料。

     2)程式執行效果---模型訓練過程

這裡寫圖片描述
從執行效果來看,總共有6040個使用者,3706個電影(已經去重),1000209條評分資料;如程式,我們把所有資料分為三部分:60%用於訓練、20%使用者校驗、20%使用者測試模型;接下來是模型在不同引數下的均方根誤差(RMSE)值,以及對應的引數,最優的引數選擇均方根誤差(RMSE—0.8665911…)最小的引數值—即最優引數模型建立;接著,使用20%的測試模型資料來測試模型的好壞,也就是均方根誤差(RMSE),這裡計算的結果為0.86493444…,在最優引數模型基礎上提升了22.32%的準確率。
說明下,其實在資料的劃分上(60%+20%+20%),最好隨機劃分資料,這樣得到的結果更有說服力。

   3)程式執行效果---電影推薦結果

這裡寫圖片描述
最後,給使用者推薦10部自己未看過的電影。

4、總結
這樣,一個簡單的基於模型的電影推薦應用就算OK了。

二、實時推薦架構分析
上面,實現了簡單的推薦系統應用,但是,僅僅實現使用者的定向推薦,在實際應用中價值不是非常大,如果體現價值,最好能夠實現實時或者準實時推薦。
下面,簡單介紹下實時推薦的一個架構:
這裡寫圖片描述

該架構圖取自淘寶Spark On Yarn的實時架構,這裡,給出一些個人的觀點:
架構圖分為三層:離線、近線和線上。
離線部分:主要實現模型的建立。原始資料通過ETL加工清洗,得到目標資料,目標業務資料結合合適的演算法,學習訓練模型,得到最佳的模型。
近線部分:主要使用HBase儲存使用者行為資訊,模型混合系統綜合顯性反饋和隱性反饋的模型處理結果,將最終的結果推薦給使用者。
線上部分:這裡,主要有兩種反饋,顯性和隱性,個人理解,顯性反饋理解為使用者將商品加入購物車,使用者購買商品這些使用者行為;隱性反饋理解為使用者在某個商品上停留的時間,使用者點選哪些商品這些使用者行為。這裡,為了實現實時/準實時操作,使用到了Spark Streaming對資料進行實時處理。(有可能是Flume+Kafka+Spark Streaming架構)