1. 程式人生 > >利用spark ml 進行協同過濾推薦

利用spark ml 進行協同過濾推薦

本文面向spark以及入門推薦系統的新手(包括我自己)

spark環境搭建

Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark 2.3.1 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).

自己專案開發程式

Scala and Java users can include Spark in their projects using its Maven coordinates and in the future Python users can also install Spark from PyPI

本地spark

Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It’s easy to run locally on one machine — all you need is to have java installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.

安裝jdk8

spark

環境變數

export JAVA_HOME=$(/usr/libexec/java_home)
export
SPARK_HOME={YOUR_SPARK_HOME} export PATH=$SPARK_HOME/bin:$PATH

測試spark demo

run-example SparkPi 10

如果設定正確,會出現以下日誌:

2018-09-01 13:30:09 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool
2018-09-01 13:30:09 INFO  DAGScheduler:54 - ResultStage 0 (reduce at SparkPi.scala
:38) finished in 0.785 s 2018-09-01 13:30:09 INFO DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 0.888303 s Pi is roughly 3.142015142015142 2018-09-01 13:30:09 INFO AbstractConnector:318 - Stopped Spark@3088660d{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 2018-09-01 13:30:09 INFO SparkUI:54 - Stopped Spark web UI at http://192.168.0.102:4040 2018-09-01 13:30:09 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped! 2018-09-01 13:30:09 INFO MemoryStore:54 - MemoryStore cleared 2018-09-01 13:30:09 INFO BlockManager:54 - BlockManager stopped 2018-09-01 13:30:09 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2018-09-01 13:30:09 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2018-09-01 13:30:09 INFO SparkContext:54 - Successfully stopped SparkContext 2018-09-01 13:30:09 INFO ShutdownHookManager:54 - Shutdown hook called 2018-09-01 13:30:09 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/nb/59yflyk555q86ny7pj84kjnm0000gn/T/spark-ef6810c7-0fbd-4ec6-a1c4-da3d41963d43 2018-09-01 13:30:09 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/nb/59yflyk555q86ny7pj84kjnm0000gn/T/spark-1cdbae12-c127-47cb-8571-7d546f088a45

spark-shell

You can also run Spark interactively through a modified version of the Scala shell. This is a great way to learn the framework.

spark-shell --master local[2]
spark-shell --master "local[4]"

The –master option specifies the master URL for a distributed cluster, or local to run locally with one thread, or local[N] to run locally with N threads. You should start by using local for testing. For a full list of options, run Spark shell with the –help option.

因為我日後的工作和推薦演算法相關,所以直接上手spark ml 的協同過濾

協同過濾實踐

spark-shell --master "local[4]"

輸入以下指令碼:

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

// 注意此處需要手敲
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()

// 分割訓練集和測試集
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))



// Build the recommendation model using ALS on the training data
val als = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)

一步一步地執行,如果正常,最後檢視推薦

userSubsetRecs.show(false)

輸出

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                       |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|28    |[[55, 6.5194335], [48, 5.5661573], [79, 5.239481], [81, 5.092548], [42, 5.0799093], [92, 5.032366], [50, 5.024519], [12, 4.713505], [68, 4.4784117], [10, 4.3225317]] |
|26    |[[46, 7.1047177], [32, 6.8479147], [53, 6.2369795], [30, 5.704584], [94, 5.611696], [7, 5.046065], [23, 4.9733334], [22, 4.9221854], [90, 4.765525], [87, 4.404424]]  |
|27    |[[18, 3.7961326], [39, 3.2431319], [46, 3.225921], [38, 3.1762652], [51, 3.1215887], [80, 3.118711], [34, 3.1077363], [1, 3.0809362], [75, 3.075434], [62, 3.0638745]]|
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

讀取檔案

spark.read.textFile
+--------------------+
|               value|
+--------------------+
| 0::2::3::1424380312|
| 0::3::1::1424380312|

返回的是Datasets

輸入格式

ratings.show(false)

demo通過定義 case class和解析每一行記錄對映到case calss的欄位,然後對Datasets進行map,可以生成結構化的dataframe

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|0     |2      |3.0   |1424380312|
|0     |3      |1.0   |1424380312|
|0     |5      |2.0   |1424380312|
|0     |9      |4.0   |1424380312|
+------+-------+------+----------+

case class?

在宣告樣例類時,下面的過程自動發生了:

構造器的每個引數都成為val,除非顯式被宣告為var,但是並不推薦這麼做;
在伴生物件中提供了apply方法,所以可以不使用new關鍵字就可構建物件;
提供unapply方法使模式匹配可以工作;
生成toString、equals、hashCode和copy方法,除非顯示給出這些方法的定義。

在我們例項化一個類的時,可以帶上0個或者多個的引數,編譯器在例項化的時會呼叫 apply 方法。我們可以在類和物件中都定義 apply 方法。

就像我們之前提到過的,unapply 用於提取我們指定查詢的值,它與 apply 的操作相反。 當我們在提取器物件中使用 match 語句是,unapply 將自動執行

spark 協同過濾分析

Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix. spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in spark.ml has the following parameters:

numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10). 並行度
rank is the number of latent factors in the model (defaults to 10). 隱因子個數
maxIter is the maximum number of iterations to run (defaults to 10). 最大迭代數
regParam specifies the regularization parameter in ALS (defaults to 1.0). 正則懲罰,防止過擬合
implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback). 隱反饋
alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0). 隱反饋的執行度相關
nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false). 是否約束非負

協同過濾的目的就是要填充 user-item 矩陣空缺的位置,spark-ml的目前支援基於模型的協同過濾。使用者和物品可以用一些隱因子的矩陣去預測空缺的位置。spark用的是ALS演算法去學習這些隱因子