基於Spark實現推薦演算法-4:基於物品的協同過濾(實現篇)
演算法設計與實現
基於物品的協同過濾又稱Item-Based CF.
基於Spark的Item-Based CF演算法其實現原理和步驟與經典方法基本一致,不同的地方主要在於具體步驟內的並行化計算。
相似度演算法
在Spark MLlib中提供了餘弦相似度的分散式實現,org.apache.spark.mllib.linalg.distributed包中的IndexedRowMatrix是一個分散式矩陣類,其中提供了一個columnSimilarities方法用於計算該矩陣各列之間的餘弦相似度。
預測值計算
採用加權求和的方法計算預測值.
實現步驟:
步驟分解
Step 1:讀取使用者評分資料,設使用者數為U,物品數目為I,將資料轉換為以使用者為行,物品為列的二維矩陣R,R維度為(U×I)。矩陣的每一個數據表示某個使用者對特定物品的評分。
Step 2:計算R每列之間的相似度,可以得到維度(I×I)的矩陣S。S(i,j)表示物品i和物品j之間的相似度。
Step 3:使用預測值計算公式計算使用者對未評分物品的預測評分。得到預測評分矩陣P,P維度為(U×I),P(i,j) 表示使用者i對物品j的預測評分。
Step 4:對使用者i進行推薦。找出P的第i行中評分最高的前K個物品推薦給使用者。K是需要推薦的物品數量。
基於Spark的實現
鑑於從Spark 2.0.0開始基於Dataset的API成為了主要程式設計API,本文采用Dataset的API進行實現,使用的語言為Scala。基於Spark平臺Item-Based CF可以分為4步實現:
Step 1:讀取評分資料集,這裡是從Hive資料倉庫讀取資料,udata是表名,userId、itemId、rating是3個欄位,分別是[使用者ID,物品ID,評分]。這裡按照8:2的比例將資料集分為訓練集和測試集。
//讀取評分資料集
def dataSet(): (DataFrame, DataFrame) = {
val table = " udata"
val df = spark.sql(s"select userId,itemId,rating from test.$table")
val Array(training, test) = df.randomSplit(Array(0.8 , 0.2))
training.cache()
test.cache()
(training, test)
}
Step 2:將資料集轉換成以使用者為行、物品為列的二維評分矩陣,矩陣的每一個行是一個使用者對所有物品的評分。然後求出評分矩陣每列之間的相似度,得到一個以物品ID為行和列,以相似度為資料的矩陣,這個矩陣就是物品相似度矩陣。
//計算相似度矩陣
//評分資料轉換成矩陣
def parseToMatrix(data: DataFrame): CoordinateMatrix = {
val parsedData = data.rdd.map {
case Row(user: Int, item: Int, rate: Int) =>
MatrixEntry(user, item, rate.toDouble)
}
new CoordinateMatrix(parsedData)
}
//計算相似度矩陣
def standardCosine(matrix: CoordinateMatrix): RDD[MatrixEntry] = {
val similarity = matrix.toIndexedRowMatrix().columnSimilarities()
val sim = similarity.entries
sim
}
Step 3:計算測試集相似物品表。將測試集與訓練集、物品相似度表進行左聯接,得到測試集相似物品表,欄位為[使用者ID,物品ID,實際評分,相似物品評分,相似度]。然後將該表註冊成臨時表testAndSim,並且快取起來,供下一步使用。
//計算測試集相似物品表
val testItemSim = spark.sql(
"""
|select test.userId,test.itemId,test.rating testRating,training.rating,sim.sim
|from test
| left join training on test.userId=training.userId
| left join itemSim sim on test.itemId=sim.itemX and training.itemId=sim.itemY
""".stripMargin
)
testItemSim.cache()
testItemSim.createOrReplaceTempView("testAndSim")
Step 4:預測測試集中使用者對物品的評分。對上一步得到的測試集相似物品表testAndSim進行計算,將該表資料按照(userId,itemId)進行分組,取每組相似度前K個物品的評分。最後依照預測值計算公式求出預測值,得到預測評分表testAndPre,其中pre欄位就是對應的預測評分值。
//預測評分
val sqlRank = "select userId,itemId,testRating,rating,sim," +
"rank() over (partition by userId,itemId order by sim desc) rank\n" +
"from testAndSim"
val testAndPre = spark.sql(
"select userId,itemId,first(testRating) rate,nvl(sum(rating*sim)/sum(abs(sim)),0) pre\n" +
"from( " +
" select *" +
" from (" + sqlRank + ") t " +
s" where rank <= $k " +
") w " +
"group by userId,itemId"
)