1. 程式人生 > >scala應用-基於user協同過濾的推薦系統

scala應用-基於user協同過濾的推薦系統

以下是一個只用scala語言開發的推薦系統,可以參考一下,這個能執行一些小資料集,當然可以改造成多執行緒,實踐中表明,百M級別多執行緒和分散式的相同配置的機器在運算上並沒有很大的時間上的差別。這個要研究scala原始碼的執行緒池物件(和java的很類似)。

SimilarityMetrics.scala:

package com.glad.ml

object SimilarityMetrics {
  /**
   * Calculate Pearson Correlation
   * @param u
   * @param v
   * @return
   */
  def pearsonCorrelation(u: Map[Int, Double], v: Map[Int, Double]): Double = {

    val
xMean = u.values.sum / u.values.size val yMean = v.values.sum / v.values.size val diffValues = (u.keys ++ v.keys).map { key => // what to do with missing values? val x = u.getOrElse(key, 0.0) val y = v.getOrElse(key, 0.0) (x - xMean, y - yMean) } val (cov, sx, sy) = diffValues.foldLeft((0.0
, 0.0, 0.0)) { (a, b) => val (x, y) = b (a._1 + (x * y), a._2 + (x * x), a._3 + (y * y)) } // pearson correlation cov / (Math.sqrt(sx) * Math.sqrt(sy)) } }

UserNeighborhoodRecommender.scala:

package com.glad.ml

import scala.io.Source
import java.io.File
import scala.collection
.immutable.TreeMap object UserNeighborhoodRecommender { def recommender( preferences: Map[Long, Map[Int, Double]], itemIds: Set[Int], userIds: Set[Long], similarityFunction: ((Map[Int, Double], Map[Int, Double]) => Double), u: Long): List[Tuple2[Int, Double]] = { /* for every other user w compute a similarity s between u and w retain the top users, ranked by similarity, as a neighbourhood n for item i in neighbourhood except the ones rated by u for user v in neighbourhood who has a preference for i compute a similarity s between u and v incorporate v's preference for i, weighted by s, into a running average return the top items, ranked by weighted average */ val topN = 2 val neighbours = userIds.filterNot(_ == u).toList.map { w => val sim = similarityFunction(preferences(u), preferences(w)) (w, sim) }.sortBy(_._2).reverse.map(_._1).take(topN) // find items rated by neighbour and not by u val itemsInNeighbourhood = neighbours.flatMap { neighbour => preferences(neighbour).keys.filterNot(item => preferences(u).contains(item)) } val weightedPreferences = itemsInNeighbourhood.flatMap { i => val ratersOfItem = neighbours.filter(v => preferences(v).contains(i)) ratersOfItem.map { v => val sim = similarityFunction(preferences(u), preferences(v)) val pref = preferences(v)(i) val weightedPref = sim * pref (i, weightedPref) } } val redommendedItems = weightedPreferences.groupBy(_._1).map { x => val (item, weightedPrefs) = x val sum = weightedPrefs.foldLeft(0.0)((a, b) => a + b._2) (item, sum / weightedPrefs.size) }.toList.sortBy(_._2).reverse redommendedItems } }

UserNeighborhoodRecommenderMain.scala:

package com.glad.ml

import scala.io.Source
import java.io.File
import scala.collection.immutable.TreeMap

object UserNeighborhoodRecommenderMain {

  def main(args: Array[String]) {
    val inputFile = if (args.length > 0) args(0) else "train.csv"
    val modelFile = new File(inputFile)
    if (!modelFile.exists()) {
      println("Please, specify name of file, or put file 'train.csv' into current directory!")
      System.exit(1)
    }

    val src = Source.fromFile(modelFile)
    val preferences = src
      .getLines
      .map { line => line.split(",") }
      .filter { e => e.length == 3 }
      .map { e => (e(0).toLong, e(1).toInt, e(2).toDouble) }
      .foldLeft(TreeMap[Long, TreeMap[Int, Double]]()) { (m, e) =>
        val (userId, itemId, preference) = e
        val values = m.getOrElse(userId, TreeMap[Int, Double]())
        m + (userId -> (values + (itemId -> preference)))
      }

    val userIds = preferences.keySet
    val itemIds = preferences.values.map(x => x.keySet).reduce((x, y) => x union y)
    println(userIds)
    println(itemIds)
    preferences foreach println

    val recList = userIds.map{ userId =>
        val rs = UserNeighborhoodRecommender.recommender(preferences, itemIds, userIds,
      SimilarityMetrics.pearsonCorrelation, userId)
      println("Recommendations" + userId + ": ")
      rs foreach println
      (userId, rs)
    }

    //TODO 評估推薦效果 recList和testData比較
  }
}

如果看過我另外一篇文章Spark RDD轉換成其他資料型別就知道這兩個是有關聯的。因為當初我很傻很天真的以為只要是scala在Spark上就會分散式,然而並不是,想要在Spark分散式運算還是老實用RDD吧。而RDD編碼用這種思路是不行的…