1. 程式人生 > >基於Spark實現推薦演算法-4:基於物品的協同過濾(實現篇)

基於Spark實現推薦演算法-4:基於物品的協同過濾(實現篇)

演算法設計與實現

基於物品的協同過濾又稱Item-Based CF.
基於Spark的Item-Based CF演算法其實現原理和步驟與經典方法基本一致,不同的地方主要在於具體步驟內的並行化計算。

相似度演算法

在Spark MLlib中提供了餘弦相似度的分散式實現,org.apache.spark.mllib.linalg.distributed包中的IndexedRowMatrix是一個分散式矩陣類,其中提供了一個columnSimilarities方法用於計算該矩陣各列之間的餘弦相似度。

預測值計算

採用加權求和的方法計算預測值.

實現步驟:

步驟分解

Step 1:讀取使用者評分資料,設使用者數為U,物品數目為I,將資料轉換為以使用者為行,物品為列的二維矩陣R,R維度為(U×I)。矩陣的每一個數據表示某個使用者對特定物品的評分。

Step 2:計算R每列之間的相似度,可以得到維度(I×I)的矩陣S。S(i,j)表示物品i和物品j之間的相似度。

Step 3:使用預測值計算公式計算使用者對未評分物品的預測評分。得到預測評分矩陣P,P維度為(U×I),P(i,j) 表示使用者i對物品j的預測評分。

Step 4:對使用者i進行推薦。找出P的第i行中評分最高的前K個物品推薦給使用者。K是需要推薦的物品數量。

基於Spark的實現

鑑於從Spark 2.0.0開始基於Dataset的API成為了主要程式設計API,本文采用Dataset的API進行實現,使用的語言為Scala。基於Spark平臺Item-Based CF可以分為4步實現:

Step 1:讀取評分資料集,這裡是從Hive資料倉庫讀取資料,udata是表名,userId、itemId、rating是3個欄位,分別是[使用者ID,物品ID,評分]。這裡按照8:2的比例將資料集分為訓練集和測試集。

//讀取評分資料集
def dataSet(): (DataFrame, DataFrame) = {
    val table = " udata"
    val df = spark.sql(s"select userId,itemId,rating from test.$table")
    val Array(training, test) = df.randomSplit(Array(0.8
, 0.2)) training.cache() test.cache() (training, test) }

Step 2:將資料集轉換成以使用者為行、物品為列的二維評分矩陣,矩陣的每一個行是一個使用者對所有物品的評分。然後求出評分矩陣每列之間的相似度,得到一個以物品ID為行和列,以相似度為資料的矩陣,這個矩陣就是物品相似度矩陣。

//計算相似度矩陣
//評分資料轉換成矩陣
  def parseToMatrix(data: DataFrame): CoordinateMatrix = {
    val parsedData = data.rdd.map {
      case Row(user: Int, item: Int, rate: Int) =>
        MatrixEntry(user, item, rate.toDouble)
    }
    new CoordinateMatrix(parsedData)
  }
  //計算相似度矩陣
  def standardCosine(matrix: CoordinateMatrix): RDD[MatrixEntry] = {
    val similarity = matrix.toIndexedRowMatrix().columnSimilarities()
    val sim = similarity.entries
    sim
  }

Step 3:計算測試集相似物品表。將測試集與訓練集、物品相似度表進行左聯接,得到測試集相似物品表,欄位為[使用者ID,物品ID,實際評分,相似物品評分,相似度]。然後將該表註冊成臨時表testAndSim,並且快取起來,供下一步使用。

//計算測試集相似物品表
val testItemSim = spark.sql(
      """
        |select test.userId,test.itemId,test.rating testRating,training.rating,sim.sim
        |from test
        | left join training on test.userId=training.userId
        | left join itemSim sim on test.itemId=sim.itemX and training.itemId=sim.itemY
      """.stripMargin
    )
    testItemSim.cache()
    testItemSim.createOrReplaceTempView("testAndSim")

Step 4:預測測試集中使用者對物品的評分。對上一步得到的測試集相似物品表testAndSim進行計算,將該表資料按照(userId,itemId)進行分組,取每組相似度前K個物品的評分。最後依照預測值計算公式求出預測值,得到預測評分表testAndPre,其中pre欄位就是對應的預測評分值。

//預測評分
val sqlRank = "select userId,itemId,testRating,rating,sim," +
        "rank() over (partition by userId,itemId order by sim desc) rank\n" +
        "from testAndSim"
val testAndPre = spark.sql(
        "select userId,itemId,first(testRating) rate,nvl(sum(rating*sim)/sum(abs(sim)),0) pre\n" +
          "from( " +
          "  select *" +
          "  from  (" + sqlRank + ") t " +
          s" where rank <= $k " +
          ") w " +
          "group by userId,itemId"
)