Spark資料探勘例項1:基於 Audioscrobbler 資料集音樂推薦
本例項來源於《Spark高階資料分析》,這是一個很好的spark資料探勘的例項。從經驗上講,推薦引擎屬於大規模機器學習,在日常購物中大家或許深有體會,比如:你在淘寶上瀏覽了一些商品,或者購買了一些商品,那麼淘寶就會根據你的偏好給你推薦一些其他類似的商品。然而,相比較其他機器學習演算法,推薦引擎的輸出更加的直觀,有時候的推薦效果讓人吃驚。作為機器學習開篇文章,本篇文章會系統的介紹基於Audioscrobbler資料集的音樂推薦。
資料集介紹
Audioscrobbler資料集是一個公開發布的資料集,讀者可以在(https://github.com/libaoquan95/aasPractice/tree/master/c3/profiledata_06-May-2005
推薦演算法介紹
由於所選取的資料集只記錄了使用者和歌曲之間的互動情況,除了藝術家名字之外沒有其他資訊。因此要找的學習演算法不需要使用者和藝術家的屬性資訊,這類演算法通常被稱為協同過濾。如果根據兩個使用者的年齡相同來判斷他們可能具有相似的偏好,這不叫協同過濾。相反,根據兩個使用者播放過許多相同歌曲來判斷他們可能都喜歡某首歌,這是協調過濾。
本篇所用的演算法在數學上稱為迭代最小二乘,把使用者播放資料當成矩陣A,矩陣低i行第j列上的元素的值,代表使用者i播放藝術家j的音樂。矩陣A是稀疏的,絕大多數元素是0,演算法將A分解成兩個小矩陣X和Y,既A=XYT,X代表使用者特徵矩陣,Y代表特徵藝術家矩陣。兩個矩陣的乘積當做使用者-藝術家關係矩陣的估計。可以通過下邊一組圖直觀的反映:
現在假如有5個聽眾,音樂有5首,那麼A是一個5*5的矩陣,假如評分如下:
圖2.1 使用者訂閱矩陣
假如d是三個屬性,那麼X的矩陣如下:
圖2.2 使用者-特徵矩陣
Y的矩陣如下:
圖2.3 特徵-電影矩陣
實際的求解過程中通常先隨機的固定矩陣Y,則,為提高計算效率,通常採用平行計算X的每一行,既。得到X之後,再反求出Y,不斷的交替迭代,最終使得XYT與A的平方誤差小於指定閾值,停止迭代,得到最終的X(代表使用者特徵矩陣)和Y矩陣(代表特徵藝術家矩陣)。在根據最終X和Y矩陣結果,向用戶進行推薦。
資料準備
首先將樣例資料上傳到HDFS,如果想要在本地測試這些功能的話,需要記憶體數量至少 6g, 當然可以通過減少資料量來達到通用的測試。
object RunRecommender { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setMaster("local[*]") val spark = SparkSession.builder().config(conf).getOrCreate() // Optional, but may help avoid errors due to long lineage // spark.sparkContext.setCheckpointDir("hdfs:///tmp/") spark.sparkContext.setCheckpointDir("d:///tmp/") //val base = "hdfs:///user/ds/" val base = "E:/newcode/spark/aas/data/"; val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt") val rawArtistData = spark.read.textFile(base + "artist_data.txt") val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt") val runRecommender = new RunRecommender(spark) runRecommender.preparation(rawUserArtistData, rawArtistData, rawArtistAlias) runRecommender.model(rawUserArtistData, rawArtistData, rawArtistAlias) runRecommender.evaluate(rawUserArtistData, rawArtistAlias) runRecommender.recommend(rawUserArtistData, rawArtistData, rawArtistAlias) } }
def preparation( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = { rawUserArtistData.take(5).foreach(println) val userArtistDF = rawUserArtistData.map { line => val Array(user, artist, _*) = line.split(' ') (user.toInt, artist.toInt) }.toDF("user", "artist") userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show() val artistByID = buildArtistByID(rawArtistData) val artistAlias = buildArtistAlias(rawArtistAlias) val (badID, goodID) = artistAlias.head artistByID.filter($"id" isin (badID, goodID)).show() }
/** * 過濾無效的使用者藝術家ID和名字行,將格式不正確的資料行剔除掉。 * @param rawArtistData * @return */ def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = { rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None } else { try { Some((id.toInt, name.trim)) } catch { case _: NumberFormatException => None } } }.toDF("id", "name") } /** * 過濾藝術家id和對應的別名id,將格式拼寫錯誤的行剔除掉。 * @param rawArtistAlias * @return */ def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = { rawArtistAlias.flatMap { line => val Array(artist, alias) = line.split('\t') if (artist.isEmpty) { None } else { Some((artist.toInt, alias.toInt)) } }.collect().toMap }
程式碼中模型訓練好之後,預測了使用者 2093760 的推薦結果,我測試結果如下,由於裡面程式碼使用了隨機生成初始矩陣,每個人的結果都有可能不一樣。
Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))
程式碼中也給出了該使用者以前聽過的藝術家的名字如下:
Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))
模型評價
auc評價方法
def areaUnderCurve( positiveData: DataFrame, bAllArtistIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive". // Make predictions for each of them, including a numeric score val positivePredictions = predictFunction(positiveData.select("user", "artist")). withColumnRenamed("prediction", "positivePrediction") // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other artists, excluding those that are "positive" for the user. val negativeData = positiveData.select("user", "artist").as[(Int,Int)]. groupByKey { case (user, _) => user }. flatMapGroups { case (userID, userIDAndPosArtistIDs) => val random = new Random() val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet val negative = new ArrayBuffer[Int]() val allArtistIDs = bAllArtistIDs.value var i = 0 // Make at most one pass over all artists to avoid an infinite loop. // Also stop when number of negative equals positive set size while (i < allArtistIDs.length && negative.size < posItemIDSet.size) { val artistID = allArtistIDs(random.nextInt(allArtistIDs.length)) // Only add new distinct IDs if (!posItemIDSet.contains(artistID)) { negative += artistID } i += 1 } // Return the set with user ID added back negative.map(artistID => (userID, artistID)) }.toDF("user", "artist") // Make predictions on the rest: val negativePredictions = predictFunction(negativeData). withColumnRenamed("prediction", "negativePrediction") // Join positive predictions to negative predictions by user, only. // This will result in a row for every possible pairing of positive and negative // predictions within each user. val joinedPredictions = positivePredictions.join(negativePredictions, "user"). select("user", "positivePrediction", "negativePrediction").cache() // Count the number of pairs per user val allCounts = joinedPredictions. groupBy("user").agg(count(lit("1")).as("total")). select("user", "total") // Count the number of correctly ordered pairs per user val correctCounts = joinedPredictions. filter($"positivePrediction" > $"negativePrediction"). groupBy("user").agg(count("user").as("correct")). select("user", "correct") // Combine these, compute their ratio, and average over all users val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer"). select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")). agg(mean("auc")). as[Double].first() joinedPredictions.unpersist() meanAUC } 完整程式碼下載:RunRecommender.scala