1. 程式人生 > >Spark資料探勘例項1:基於 Audioscrobbler 資料集音樂推薦

Spark資料探勘例項1:基於 Audioscrobbler 資料集音樂推薦

本例項來源於《Spark高階資料分析》,這是一個很好的spark資料探勘的例項。從經驗上講,推薦引擎屬於大規模機器學習,在日常購物中大家或許深有體會,比如:你在淘寶上瀏覽了一些商品,或者購買了一些商品,那麼淘寶就會根據你的偏好給你推薦一些其他類似的商品。然而,相比較其他機器學習演算法,推薦引擎的輸出更加的直觀,有時候的推薦效果讓人吃驚。作為機器學習開篇文章,本篇文章會系統的介紹基於Audioscrobbler資料集的音樂推薦。

資料集介紹

Audioscrobbler資料集是一個公開發布的資料集,讀者可以在(https://github.com/libaoquan95/aasPractice/tree/master/c3/profiledata_06-May-2005

)網站獲取。資料集主要有三部分組成,user_artist_data.txt檔案是主要的資料集檔案記錄了約2420條使用者id、藝術家id以及使用者收聽藝術家歌曲的次數資料,包含141000個使用者和160萬個藝術家;artist_data.txt檔案記錄了藝術家id和對應的名字;artist_alias.txt記錄了藝術家id和對應的別稱id。

 

推薦演算法介紹

由於所選取的資料集只記錄了使用者和歌曲之間的互動情況,除了藝術家名字之外沒有其他資訊。因此要找的學習演算法不需要使用者和藝術家的屬性資訊,這類演算法通常被稱為協同過濾。如果根據兩個使用者的年齡相同來判斷他們可能具有相似的偏好,這不叫協同過濾。相反,根據兩個使用者播放過許多相同歌曲來判斷他們可能都喜歡某首歌,這是協調過濾。

本篇所用的演算法在數學上稱為迭代最小二乘,把使用者播放資料當成矩陣A,矩陣低i行第j列上的元素的值,代表使用者i播放藝術家j的音樂。矩陣A是稀疏的,絕大多數元素是0,演算法將A分解成兩個小矩陣X和Y,既A=XYT,X代表使用者特徵矩陣,Y代表特徵藝術家矩陣。兩個矩陣的乘積當做使用者-藝術家關係矩陣的估計。可以通過下邊一組圖直觀的反映:

現在假如有5個聽眾,音樂有5首,那麼A是一個5*5的矩陣,假如評分如下:

圖2.1 使用者訂閱矩陣

假如d是三個屬性,那麼X的矩陣如下:

 

圖2.2 使用者-特徵矩陣

Y的矩陣如下:

圖2.3 特徵-電影矩陣

實際的求解過程中通常先隨機的固定矩陣Y,則,為提高計算效率,通常採用平行計算X的每一行,既。得到X之後,再反求出Y,不斷的交替迭代,最終使得XYT與A的平方誤差小於指定閾值,停止迭代,得到最終的X(代表使用者特徵矩陣)和Y矩陣(代表特徵藝術家矩陣)。在根據最終X和Y矩陣結果,向用戶進行推薦。

 

資料準備

首先將樣例資料上傳到HDFS,如果想要在本地測試這些功能的話,需要記憶體數量至少 6g, 當然可以通過減少資料量來達到通用的測試。

object RunRecommender {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // Optional, but may help avoid errors due to long lineage
   // spark.sparkContext.setCheckpointDir("hdfs:///tmp/")
    spark.sparkContext.setCheckpointDir("d:///tmp/")

    //val base = "hdfs:///user/ds/"
    val base =  "E:/newcode/spark/aas/data/";
    val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt")
    val rawArtistData = spark.read.textFile(base + "artist_data.txt")
    val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")

    val runRecommender = new RunRecommender(spark)
    runRecommender.preparation(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.model(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.evaluate(rawUserArtistData, rawArtistAlias)
    runRecommender.recommend(rawUserArtistData, rawArtistData, rawArtistAlias)
  }

}


def preparation(
    rawUserArtistData: Dataset[String],
    rawArtistData: Dataset[String],
    rawArtistAlias: Dataset[String]): Unit = {

  rawUserArtistData.take(5).foreach(println)

  val userArtistDF = rawUserArtistData.map { line =>
    val Array(user, artist, _*) = line.split(' ')
    (user.toInt, artist.toInt)
  }.toDF("user", "artist")

  userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()

  val artistByID = buildArtistByID(rawArtistData)
  val artistAlias = buildArtistAlias(rawArtistAlias)

  val (badID, goodID) = artistAlias.head
  artistByID.filter($"id" isin (badID, goodID)).show()
}

/**
  * 過濾無效的使用者藝術家ID和名字行,將格式不正確的資料行剔除掉。
  * @param rawArtistData
  * @return
  */
def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = {
  rawArtistData.flatMap { line =>
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
      None
    } else {
      try {
        Some((id.toInt, name.trim))
      } catch {
        case _: NumberFormatException => None
      }
    }
  }.toDF("id", "name")
}

/**
  * 過濾藝術家id和對應的別名id,將格式拼寫錯誤的行剔除掉。
  * @param rawArtistAlias
  * @return
  */
def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = {
  rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t')
    if (artist.isEmpty) {
      None
    } else {
      Some((artist.toInt, alias.toInt))
    }
  }.collect().toMap
}

程式碼中模型訓練好之後,預測了使用者 2093760 的推薦結果,我測試結果如下,由於裡面程式碼使用了隨機生成初始矩陣,每個人的結果都有可能不一樣。

Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))

程式碼中也給出了該使用者以前聽過的藝術家的名字如下:

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))

模型評價

auc評價方法

def areaUnderCurve(
    positiveData: DataFrame,
    bAllArtistIDs: Broadcast[Array[Int]],
    predictFunction: (DataFrame => DataFrame)): Double = {

  // What this actually computes is AUC, per user. The result is actually something
  // that might be called "mean AUC".

  // Take held-out data as the "positive".
  // Make predictions for each of them, including a numeric score
  val positivePredictions = predictFunction(positiveData.select("user", "artist")).
    withColumnRenamed("prediction", "positivePrediction")

  // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
  // small AUC problems, and it would be inefficient, when a direct computation is available.

  // Create a set of "negative" products for each user. These are randomly chosen
  // from among all of the other artists, excluding those that are "positive" for the user.
  val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
    groupByKey { case (user, _) => user }.
    flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
      val random = new Random()
      val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
      val negative = new ArrayBuffer[Int]()
      val allArtistIDs = bAllArtistIDs.value
      var i = 0
      // Make at most one pass over all artists to avoid an infinite loop.
      // Also stop when number of negative equals positive set size
      while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
        val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
        // Only add new distinct IDs
        if (!posItemIDSet.contains(artistID)) {
          negative += artistID
        }
        i += 1
      }
      // Return the set with user ID added back
      negative.map(artistID => (userID, artistID))
    }.toDF("user", "artist")

  // Make predictions on the rest:
  val negativePredictions = predictFunction(negativeData).
    withColumnRenamed("prediction", "negativePrediction")

  // Join positive predictions to negative predictions by user, only.
  // This will result in a row for every possible pairing of positive and negative
  // predictions within each user.
  val joinedPredictions = positivePredictions.join(negativePredictions, "user").
    select("user", "positivePrediction", "negativePrediction").cache()

  // Count the number of pairs per user
  val allCounts = joinedPredictions.
    groupBy("user").agg(count(lit("1")).as("total")).
    select("user", "total")
  // Count the number of correctly ordered pairs per user
  val correctCounts = joinedPredictions.
    filter($"positivePrediction" > $"negativePrediction").
    groupBy("user").agg(count("user").as("correct")).
    select("user", "correct")

  // Combine these, compute their ratio, and average over all users
  val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
    select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
    agg(mean("auc")).
    as[Double].first()

  joinedPredictions.unpersist()

  meanAUC
}

完整程式碼下載:RunRecommender.scala