1. 程式人生 > >Spark ML自定義選擇最優模型演算法深入剖析-Spark商業ML實戰

Spark ML自定義選擇最優模型演算法深入剖析-Spark商業ML實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。

1 自定義選擇最優模型

什麼叫做自定義模型?其實就是不借助Spark官方支援的交叉驗證和訓練驗證拆分,而是根據實際場景進行自定義的RMSE等指標進行綜合分析。奉上試驗美圖:

2 協同過濾(Collaborative Filtering)

  • 顯式的使用者反饋:這類是使用者在網站上自然瀏覽或者使用網站以外,顯式地提供反饋資訊,例如使用者對物品的評分或者對物品的評論。

  • 隱式的使用者反饋:這類是使用者在使用網站是產生的資料,隱式地反映了使用者對物品的喜好,例如使用者購買了某物品,使用者查看了某物品的資訊,等等

  • 顯式的使用者反饋能準確地反映使用者對物品的真實喜好,但需要使用者付出額外的代價;

  • 而隱式的使用者行為,通過一些分析和處理,也能反映使用者的喜好,只是資料不是很精確,有些行為的分析存在較大的噪音。但只要選擇正確的行為特徵,隱式的使用者反饋也能得到很好的效果,只是行為特徵的選擇可能在不同的應用中有很大的不同,

  • Spark ML目前支援基於協同過濾的模型,在這個模型裡,使用者和產品被一組可以用來預測缺失專案的潛在因子來描述。ML 實現了交替最小二乘(ALS)演算法來學習這些潛在的因子,在 ML 中的實現有如下引數:

      numBlocks 是用於並行化計算的使用者和商品的分塊個數 (預設為10)。
      rank 是模型中隱語義因子的個數(預設為10)。
      maxIter 是迭代的次數(預設為10)。
      regParam 是ALS的正則化引數(預設為1.0)。
      implicitPrefs 決定了是用顯性反饋ALS的版本還是用適用隱性反饋資料集的版本(預設是false,即用顯性反饋)。
      alpha 是一個針對於隱性反饋 ALS 版本的引數,這個引數決定了偏好行為強度的基準(預設為1.0)。
      nonnegative 決定是否對最小二乘法使用非負的限制(預設為false)。
    

3 經典協同過濾資料集

  • 在MovieLens提供的電影評分資料分為三個表:評分、使用者資訊和電影資訊,在該系列提供的附屬資料提供大概6000位讀者和100萬個評分資料。

  • 評分資料說明(ratings.data)

      該評分資料總共四個欄位,格式為UserID::MovieID::Rating::Timestamp,
      分為為使用者編號::電影編號::評分::評分時間戳,
      其中各個欄位說明如下:
          使用者編號範圍1~6040
          電影編號1~3952
          電影評分為五星評分,範圍0~5
          評分時間戳單位秒
          每個使用者至少有20個電影評分
    
          1::1193::5::978300760
          1::661::3::978302109
          1::914::3::978301968
    
  • 2.使用者資訊(users.dat)

      使用者資訊五個欄位,格式為UserID::Gender::Age::Occupation::Zip-code,
      分為為使用者編號::性別::年齡::職業::郵編。
      其中各個欄位說明如下:
      
      使用者編號範圍1~6040
      性別,其中M為男性,F為女性
      不同的數字代表不同的年齡範圍,如:25代表25~34歲範圍
      職業資訊,在測試資料中提供了21中職業分類
      地區郵編
      
      1::F::1::10::48067
      2::M::56::16::70072
      3::M::25::15::55117
    
  • 3 電影資訊(movies.dat)

      電影資料分為三個欄位,格式為MovieID::Title::Genres,分為
      電影編號::電影名::電影類別
      其中各個欄位說明如下:
      
      電影編號1~3952
      由IMDB提供電影名稱,其中包括電影上映年份
      電影分類,這裡使用實際分類名非編號,如:Action、Crime等
      
      1::Toy Story (1995)::Animation|Children's|Comedy
      2::Jumanji (1995)::Adventure|Children's|Fantasy
      3::Grumpier Old Men (1995)::Comedy|Romance
    

4 自定義模型

import org.apache.spark.sql.Row
import org.apache.spark.ml.recommendation.{ALS,ALSModel}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.evaluation.RegressionEvaluator
import spark.implicits._

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)

1 根據資料結構定義資料規範
def parseRating(str: String): Rating = {     
val fields = str.split("::")   
assert(fields.size == 4)  
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong) }
 
val ratings = spark.sparkContext.textFile("/data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|

2 分割資料集
val splits = ratings.randomSplit(Array(0.6, 0.2, 0.2),12)
val training =splits(0).cache()
val validation=splits(1).toDF.cache()
val test =splits(2).toDF.cache()


3 訓練不同引數下的模型,並集中驗證,獲取最佳引數下的模型
val numValidation =validation.count
val numTraining =training.count
val numTest =test.count

val ranks = List(8, 12,13)
val lambdas = List(0.1, 10.0, 12.0)
val numIters = List(10, 20, 20)
var bestModel: Option[ALSModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1

4  校驗集預測資料和實際資料之間的均方根誤差運算元(參看上一篇指標定義)

   模型預測結果
    +------+-------+------+----------+-----------+                                  
    |userId|movieId|rating| timestamp| prediction|
    +------+-------+------+----------+-----------+
    |    13|     31|   1.0|1424380312|    2.35308|
    |     0|     31|   1.0|1424380312|  2.5408225|
    |    18|     31|   1.0|1424380312|  1.3848196|
    |     4|     85|   1.0|1424380312|  2.4104187|
    |     8|     85|   5.0|1424380312|  3.9386258|
    |    23|     85|   1.0|1424380312| -0.7795656|
    |    29|     85|   1.0|1424380312|0.118287265|
    |    28|     65|   1.0|1424380312|  4.6700068|

  def computeRmse(model: ALSModel, data: DataFrame, n: Long): Double = {
    val predictions = model.transform(data)
    
    //計算過程:((userId,movieId),(rating,prediction)) ====> (rating,prediction)
    val predictionsAndRatings = predictions.rdd.map{x => ((x(0), x(1)),x(2))}
      .join(predictions.rdd.map(x => ((x(0), x(1)), x(4))))
      .values
    math.sqrt(predictionsAndRatings.map(x => (x._1.toString.toDouble - x._2.toString.toDouble) * (x._1.toString.toDouble - x._2.toString.toDouble)).reduce(_ + _) / n)
  }
  
 5  計算最優模型的遍歷測試
  
  for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
  
  val als = new ALS().setMaxIter(numIter).setRegParam(lambda).setRank(rank).setUserCol("userId"). setItemCol("movieId").setRatingCol("rating")
  val model = als.fit(training)
  
  
  模型作用於驗證集進行驗證
  val validationRmse = computeRmse(model, validation, numValidation)
  println("RMSE (模型評估) = " + validationRmse + " 引數 rank = "
    + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
    
  if (validationRmse < bestValidationRmse) {
    bestModel = Some(model)
    bestValidationRmse = validationRmse
    bestRank = rank
    bestLambda = lambda
    bestNumIter = numIter
  }
}

RMSE (validation) = 1.0747445332055616 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 1.045271064998892 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 1.0213510038051121 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 1.005770421453116 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.



6  最優模型作用於測試集

val testRmse = computeRmse(bestModel.get, test, numTest)
println("最優模型引數: rank = " + bestRank + " and lambda = " + bestLambda  + ", and numIter = " + bestNumIter + ", 最優模型均方根誤差為 " + testRmse + ".")
 
最優模型引數: rank = 12 and lambda = 0.1, and numIter = 20, 最優模型均方根誤差為 0.9519301678208573.

5 飯後甜點(基於RegressionEvaluator評估器)

使用ALS來建立推薦模型,這裡我們構建了兩個模型,一個是顯性反饋,一個是隱性反饋

1 構建模型
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

顯性反饋 
val alsExplicit = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId"). setItemCol("movieId").setRatingCol("rating")

隱性反饋
val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true). setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

val modelExplicit = alsExplicit.fit(training)
val modelImplicit = alsImplicit.fit(training)

2 模型預測
val predictionsExplicit = modelExplicit.transform(test)
predictionsExplicit.show()

------+-------+------+----------+----------+                                   
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|    29|     31|   1.0|1424380312| 1.6074163|
|     0|     31|   1.0|1424380312| 1.7223389|
|    28|     85|   1.0|1424380312|  4.469816|
|    26|     85|   1.0|1424380312| 1.0373509|
|    15|     85|   1.0|1424380312|  5.004366|
|    23|     85|   1.0|1424380312|-1.0499454|
|     5|     65|   2.0|1424380312| 0.6295809|


val predictionsImplicit = modelImplicit.transform(test)
predictionsImplicit.show()

+------+-------+------+----------+------------+                                 
|userId|movieId|rating| timestamp|  prediction|
+------+-------+------+----------+------------+
|    29|     31|   1.0|1424380312|  0.24691844|
|     0|     31|   1.0|1424380312|  0.18451405|
|    28|     85|   1.0|1424380312|   0.6521389|
|    26|     85|   1.0|1424380312|   0.9124242|
|    15|     85|   1.0|1424380312|   0.6542819|
|    23|     85|   1.0|1424380312|  0.34113467|

3 模型評估
scala>val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating"). setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_bfdf233c6bad

scala>val rmseExplicit = evaluator.evaluate(predictionsExplicit)
rmseExplicit: Double = 1.6652229504120535                                       

scala>val rmseImplicit = evaluator.evaluate(predictionsImplicit)
rmseImplicit: Double = 1.7925024428799021 

結語

花了大量時間實現了自定義模型評估演算法,不管對你有沒有用,我覺得通過例項和現場試驗,我終於強化了我的知識體系。辛苦成文,各自珍惜,謝謝!奉上美圖。

秦凱新 於深圳 201811181817