1. 程式人生 > >【實踐】基於spark的CF實現及優化

【實踐】基於spark的CF實現及優化

最近專案中用到ItemBased Collaborative Filtering,實踐過spark mllib中的ALS,但是因為其中涉及到降維操作,大資料量的計算實在不能恭維。

所以自己實踐實現基於spark的分散式cf,已經做了部分優化。目測執行效率還不錯。以下程式碼

package model

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.hive.HiveContext

/**
  * Created by dengxing on 2017/7/18.
  */
object CF {

  /** 基於dt時間獲取原始資料來源
    *
    * @param sc    SparkContext
    * @param table 轉換的hive表
    * @param day   獲取當前日期的資料
    * @return 原始資料的dataFrame
    */
  def getResource(sc: SparkContext, table: String, day: String) = {
    val hiveContext = new HiveContext(sc)
    import hiveContext.sql
    val resource = sql("select "
      + "uid,"
      + "aid,"
      + "cnt"
      + " from " + table + " where dt ='" + day + "'")
    resource
  }

  /**
    * 分散式計算餘弦相似度
    * --------------------------------
    * user1     user2
    * item1  score11   score21 (X)
    * item2  score12   score22 (Y)
    * --------------------------------
    * sim(item1,item2) = XY / math.sqrt(XX) * math.sqrt(YY)
    * XY= score11 * score12 + score21 * score22
    * XX = score11 * score11 + score21 * score21
    * YY = score12 * score12 + score22 * score22
    *
    * @param resource
    * @return RDD[(item1,item2,sim)]
    */
  def getCosineSimilarity(resource: DataFrame): RDD[(String, (String, Double))] = {
    val rating = resource.map {
      row => {
        val uid = row.getString(0)
        val aid = row.getString(1)
        val score = row.getString(2).toDouble
        (uid, aid, score)
      }
    }
    //RDD[(uid,(aid,score))]
    val user_item_score = rating.map(f => (f._1, (f._2, f._3)))
    /*
     * 提取每個使用者有過行為的item鍵值對,即
     * RDD[((aid1,aid2),(score11,score22))]
     */
    val item_score_pair = user_item_score.join(user_item_score)
      .map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
    /*
     * 提取同一對item,所有的使用者評分向量的點積,即XY 及 XX 及 YY
     * RDD[((aid1,aid2),score11 * score12 + score21 * score22)]
     * 及 RDD[((aid1,aid1),score11 * score11 + score21 * score21)]
     * 及 RDD[((aid2,aid2),score12 * score12 + score22 * score22)]
     */
    val item_pair_ALL = item_score_pair.map(f => (f._1, f._2._1 * f._2._2)).reduceByKey(_ + _)
    /*
     * 提取每個item,所有使用者的自向量的點積,即XX或YY
     * RDD[((aid1,aid1),score11 * score11 + score21 * score21)]
     * 或 RDD[((aid2,aid2),score12 * score12 + score22 * score22)]
     */
    val item_pair_XX_YY = item_pair_ALL.filter(f => f._1._1 == f._1._2)
    /*
     * 提取每個item,所有使用者的非自向量的點積,即XY
     * RDD[((aid1,aid2),score11 * score12 + score21 * score22)]
     */
    val item_pair_XY = item_pair_ALL.filter(f => f._1._1 != f._1._2)
    /*
     * 提取item_pair_XX_YY中的item及XX或YY
     * RDD[(aid1,score11 * score11 + score21 * score21)]
     * 或 RDD[(aid2,score12 * score12 + score22 * score22)]
     */
    val item_XX_YY = item_pair_XX_YY.map(f => (f._1._1, f._2))
    /*
     *  轉化item_pair_XY為(aid1,((aid1,aid2,XY),XX)))
     *  RDD[(aid1,((aid1,aid2,score11 * score12 + score21 * score22),score11 * score11 + score21 * score21)))]
     */
    val item_XY_XX = item_pair_XY.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).join(item_XX_YY)
    /*
     *  轉為item_XY_XX為(aid2,((aid1,aid2,XY,XX),YY))
     *  RDD[(aid2,((aid1,aid2,score11 * score12 + score21 * score22,score11 * score11 + score21 * score21),score12 * score12 + score22 * score22))]
     */
    val item_XY_XX_YY = item_XY_XX.map(f => (f._2._1._2, (f._2._1._1, f._2._1._2, f._2._1._3, f._2._2))).join(item_XX_YY)
    /*
     *  提取item_XY_XX_YY中的(aid1,aid2,XY,XX,YY))
     *  RDD[(aid1,aid2,score11 * score12 + score21 * score22,score11 * score11 + score21 * score21,score12 * score12 + score22 * score22)]
     */
    val item_pair_XY_XX_YY = item_XY_XX_YY.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
    /*
     *  轉化item_pair_XY_XX_YY為(aid1,aid2,XY / math.sqrt(XX * YY))
     *  RDD[(aid1,aid2,score11 * score12 + score21 * score22 / math.sqrt((score11 * score11 + score21 * score21)*(score12 * score12 + score22 * score22))]
     */
    val item_pair_sim = item_pair_XY_XX_YY.map(f => (f._1, (f._2, f._3 / math.sqrt(f._4 * f._5))))
    item_pair_sim
  }

  /**
    * 基於item相似度矩陣為user生成topN推薦列表
    *
    * @param resource
    * @param item_sim_bd
    * @param topN
    * @return RDD[(user,List[(item,score)])]
    */
  def recommend(resource: DataFrame, item_sim_bd: Broadcast[scala.collection.Map[String, List[(String, Double)]]], topN: Int = 50) = {
    val user_item_score = resource.map(
      row => {
        val uid = row.getString(0)
        val aid = row.getString(1)
        val score = row.getString(2).toDouble
        ((uid, aid), score)
      }
    )
    /*
     * 提取item_sim_user_score為((user,item2),sim * score)
     * RDD[(user,item2),sim * score]
     */
    val user_item_simscore = user_item_score.flatMap(
      f => {
        val items_sim = item_sim_bd.value.getOrElse(f._1._2, List(("0", 0.0)))
        for (w <- items_sim) yield ((f._1._1, w._1), w._2 * f._2)
      }).filter(_._2 > 0.03)

    /*
     * 聚合user_item_simscore為 (user,(item2,sim1 * score1 + sim2 * score2))
     * 假設user觀看過兩個item,評分分別為score1和score2,item2是與user觀看過的兩個item相似的item,相似度分別為sim1,sim2
     * RDD[(user,item2),sim1 * score1 + sim2 * score2))]
     */
    val user_item_rank = user_item_simscore.reduceByKey(_ + _, 1000)
    /*
     * 過濾使用者已看過的item,並對user_item_rank基於user聚合
     * RDD[(user,CompactBuffer((item2,rank2),(item3,rank3)...))]
     */
    val user_items_ranks = user_item_rank.subtractByKey(user_item_score).map(f => (f._1._1, (f._1._2, f._2))).groupByKey()
    /*
     * 對user_items_ranks基於rank降序排序,並提取topN,其中包括使用者已觀看過的item
     * RDD[(user,ArrayBuffer((item2,rank2),...,(itemN,rankN)))]
     */
    val user_items_ranks_desc = user_items_ranks.map(f => {
      val item_rank_list = f._2.toList
      val item_rank_desc = item_rank_list.sortWith((x, y) => x._2 > y._2)
      (f._1, item_rank_desc.take(topN))
    })
    user_items_ranks_desc
  }

  /**
    * json 編碼
    *
    * @param recTopN 離線推薦結果
    */
  def encodeToJson(recTopN: (String, List[(String, Double)])) = {
    val mtype = "u2a"
    val mtype_ = "\"" + "mtype" + "\"" + ":" + "\"" + mtype + "\""
    val uid = recTopN._1
    val uid_ = "\"" + "uid" + "\"" + ":" + "\"" + uid + "\""
    val aid_score = recTopN._2
    val aids_ = new StringBuilder().append("\"" + "aids" + "\"" + ":[")
    for (v <- aid_score) {
      val aid = v._1
      val score = v._2
      val aid_score = "[" + "\"" + aid + "\"" + "," + score + "]"
      aids_.append(aid_score + ",")
    }
    aids_.deleteCharAt(aids_.length - 1).append("]")
    val result = "{" + mtype_ + "," + uid_ + "," + aids_.toString() + "}"
    result
  }

  def main(args: Array[String]): Unit = {
    val table = args(0) //要處理的表
    val day = args(1) //當前日期
    val output = args(2) //cf相似矩陣輸出路徑

    val sparkConf = new SparkConf().setAppName("Wireless ItemBased Collaborative Filtering")
    val sc = new SparkContext(sparkConf)

    val resource = getResource(sc, table, day).repartition(500)
    resource.cache()

    // 1.計算item相似度矩陣
    val item_sim: RDD[(String, (String, Double))] = getCosineSimilarity(resource)
    item_sim.cache()

    // 2.儲存cf相似度矩陣到HDFS
    item_sim.saveAsTextFile(output)

    // 3.每個item提取最相近的40個item
    val item_sim_rdd = item_sim.filter(f => f._2._2 > 0.05).groupByKey().map(
      f => {
        val item = f._1
        val items_score = f._2.toList
        val items_score_desc = items_score.sortWith((x, y) => x._2 > y._2)
        (item, items_score_desc.take(40))
      }).collectAsMap()

    // 4.廣播相似度矩陣
    val item_sim_bd: Broadcast[scala.collection.Map[String, List[(String, Double)]]] = sc.broadcast(item_sim_rdd)

    // 5.為使用者生成推薦列表
    val recTopN = recommend(resource, item_sim_bd, 50)

    recTopN.map(encodeToJson(_)).take(10).foreach(println)


  }
}



說明:對於spark的程式層面的優化,強烈建議使用