Spark Mllib構建簡單的電影推薦系統(轉)
基於模型的協同過濾應用–電影推薦
資料來源:資料
本文實現對使用者推薦電影的簡單應用。
1、測試資料描述
本次測試資料主要包括四個資料檔案:(詳細的資料描述參見README檔案)
1)使用者資料檔案
使用者ID::性別::年齡::職業編號::郵編
2)電影資料檔案
電影ID::電影名稱::電影種類
3)評分資料檔案
使用者ID::電影ID::評分::時間
4)測試資料
使用者ID::電影ID::評分::時間
這裡,前三個資料檔案用於模型訓練,第四個資料檔案用於測試模型。
2、實現程式碼:
/**
* Created by Administrator on 2017/4/6.
*/
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd._
import org.apache.spark.{SparkContext, SparkConf}
import scala.io .Source
object RecommandDemo1 {
def main(args:Array[String]) {
//遮蔽不必要的日誌顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
//設定執行環境
val sparkConf = new SparkConf().setAppName("MovieLensALS" ).setMaster("local[3]")
val sc = new SparkContext(sparkConf)
//裝載使用者評分,該評分由評分器生成(即生成檔案personalRatings.txt)
val myRatings = loadRatings(args(1))
val myRatingsRDD = sc.parallelize(myRatings, 1)
//樣本資料目錄
val movielensHomeDir = args(0)
//裝載樣本評分資料,其中最後一列Timestamp取除10的餘數作為key,Rating為值,即(Int,Rating)
val ratings = sc.textFile(movielensHomeDir + "/ratings.dat").map {
line =>
val fields = line.split("::")
// format: (timestamp % 10, Rating(userId, movieId, rating))
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}
//裝載電影目錄對照表(電影ID->電影標題)
val movies = sc.textFile(movielensHomeDir + "/movies.dat").map {
line =>
val fields = line.split("::")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect().toMap
//統計有使用者數量和電影數量以及使用者對電影的評分數目
val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()
println("Got " + numRatings + " ratings from " + numUsers + " users " + numMovies + " movies")
//將樣本評分表以key值切分成3個部分,分別用於訓練 (60%,並加入使用者評分), 校驗 (20%), and 測試 (20%)
//該資料在計算過程中要多次應用到,所以cache到記憶體
val numPartitions = 4
val training = ratings.filter(x => x._1 < 6).values.union(myRatingsRDD).repartition(numPartitions).persist()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(numPartitions).persist()
val test = ratings.filter(x => x._1 >= 8).values.persist()
val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()
println("Training: " + numTraining + " validation: " + numValidation + " test: " + numTest)
//訓練不同引數下的模型,並在校驗集中驗證,獲取最佳引數下的模型
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE(validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
//用最佳模型預測測試集的評分,並計算和實際評分之間的均方根誤差(RMSE)
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
//create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).reduce(_ + _) / numTest)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
//推薦前十部最感興趣的電影,注意要剔除使用者已經評分的電影
val myRatedMovieIds = myRatings.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect
.sortBy(-_.rating)
.take(10)
var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
sc.stop()
}
/** 校驗集預測資料和實際資料之間的均方根誤差 **/
def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double = {
val predictions:RDD[Rating] = model.predict((data.map(x => (x.user,x.product))))
val predictionsAndRatings = predictions.map{ x =>((x.user,x.product),x.rating)}
.join(data.map(x => ((x.user,x.product),x.rating))).values
math.sqrt(predictionsAndRatings.map( x => (x._1 - x._2) * (x._1 - x._2)).reduce(_+_)/n)
}
/** 裝載使用者評分檔案 personalRatings.txt **/
def loadRatings(path:String):Seq[Rating] = {
val lines = Source.fromFile(path).getLines()
val ratings = lines.map{
line =>
val fields = line.split("::")
Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}.filter(_.rating > 0.0)
if(ratings.isEmpty){
sys.error("No ratings provided.")
}else{
ratings.toSeq
}
}
}
3、執行程式
1)設定引數,執行程式
這裡有兩個輸入引數:第一個是資料檔案目錄,第二個是測試資料。
2)程式執行效果---模型訓練過程
從執行效果來看,總共有6040個使用者,3706個電影(已經去重),1000209條評分資料;如程式,我們把所有資料分為三部分:60%用於訓練、20%使用者校驗、20%使用者測試模型;接下來是模型在不同引數下的均方根誤差(RMSE)值,以及對應的引數,最優的引數選擇均方根誤差(RMSE—0.8665911…)最小的引數值—即最優引數模型建立;接著,使用20%的測試模型資料來測試模型的好壞,也就是均方根誤差(RMSE),這裡計算的結果為0.86493444…,在最優引數模型基礎上提升了22.32%的準確率。
說明下,其實在資料的劃分上(60%+20%+20%),最好隨機劃分資料,這樣得到的結果更有說服力。
3)程式執行效果---電影推薦結果
最後,給使用者推薦10部自己未看過的電影。
4、總結
這樣,一個簡單的基於模型的電影推薦應用就算OK了。
二、實時推薦架構分析
上面,實現了簡單的推薦系統應用,但是,僅僅實現使用者的定向推薦,在實際應用中價值不是非常大,如果體現價值,最好能夠實現實時或者準實時推薦。
下面,簡單介紹下實時推薦的一個架構:
該架構圖取自淘寶Spark On Yarn的實時架構,這裡,給出一些個人的觀點:
架構圖分為三層:離線、近線和線上。
離線部分:主要實現模型的建立。原始資料通過ETL加工清洗,得到目標資料,目標業務資料結合合適的演算法,學習訓練模型,得到最佳的模型。
近線部分:主要使用HBase儲存使用者行為資訊,模型混合系統綜合顯性反饋和隱性反饋的模型處理結果,將最終的結果推薦給使用者。
線上部分:這裡,主要有兩種反饋,顯性和隱性,個人理解,顯性反饋理解為使用者將商品加入購物車,使用者購買商品這些使用者行為;隱性反饋理解為使用者在某個商品上停留的時間,使用者點選哪些商品這些使用者行為。這裡,為了實現實時/準實時操作,使用到了Spark Streaming對資料進行實時處理。(有可能是Flume+Kafka+Spark Streaming架構)