package com.glad.ml

object SimilarityMetrics {
   * Calculate Pearson Correlation
   * @param u
   * @param v
   * @return
  def pearsonCorrelation(u: Map[Int, Double], v: Map[Int, Double]): Double = {

xMean = u.values.sum / u.values.size val yMean = v.values.sum / v.values.size val diffValues = (u.keys ++ v.keys).map { key => // what to do with missing values? val x = u.getOrElse(key, 0.0) val y = v.getOrElse(key, 0.0) (x - xMean, y - yMean) } val (cov, sx, sy) = diffValues.foldLeft((0.0
, 0.0, 0.0)) { (a, b) => val (x, y) = b (a._1 + (x * y), a._2 + (x * x), a._3 + (y * y)) } // pearson correlation cov / (Math.sqrt(sx) * Math.sqrt(sy)) } }


package com.glad.ml

import scala.io.Source
import java.io.File
import scala.collection
.immutable.TreeMap object UserNeighborhoodRecommender { def recommender( preferences: Map[Long, Map[Int, Double]], itemIds: Set[Int], userIds: Set[Long], similarityFunction: ((Map[Int, Double], Map[Int, Double]) => Double), u: Long): List[Tuple2[Int, Double]] = { /* for every other user w compute a similarity s between u and w retain the top users, ranked by similarity, as a neighbourhood n for item i in neighbourhood except the ones rated by u for user v in neighbourhood who has a preference for i compute a similarity s between u and v incorporate v's preference for i, weighted by s, into a running average return the top items, ranked by weighted average */ val topN = 2 val neighbours = userIds.filterNot(_ == u).toList.map { w => val sim = similarityFunction(preferences(u), preferences(w)) (w, sim) }.sortBy(_._2).reverse.map(_._1).take(topN) // find items rated by neighbour and not by u val itemsInNeighbourhood = neighbours.flatMap { neighbour => preferences(neighbour).keys.filterNot(item => preferences(u).contains(item)) } val weightedPreferences = itemsInNeighbourhood.flatMap { i => val ratersOfItem = neighbours.filter(v => preferences(v).contains(i)) ratersOfItem.map { v => val sim = similarityFunction(preferences(u), preferences(v)) val pref = preferences(v)(i) val weightedPref = sim * pref (i, weightedPref) } } val redommendedItems = weightedPreferences.groupBy(_._1).map { x => val (item, weightedPrefs) = x val sum = weightedPrefs.foldLeft(0.0)((a, b) => a + b._2) (item, sum / weightedPrefs.size) }.toList.sortBy(_._2).reverse redommendedItems } }


package com.glad.ml

import scala.io.Source
import java.io.File
import scala.collection.immutable.TreeMap

object UserNeighborhoodRecommenderMain {

  def main(args: Array[String]) {
    val inputFile = if (args.length > 0) args(0) else "train.csv"
    val modelFile = new File(inputFile)
    if (!modelFile.exists()) {
      println("Please, specify name of file, or put file 'train.csv' into current directory!")

    val src = Source.fromFile(modelFile)
    val preferences = src
      .map { line => line.split(",") }
      .filter { e => e.length == 3 }
      .map { e => (e(0).toLong, e(1).toInt, e(2).toDouble) }
      .foldLeft(TreeMap[Long, TreeMap[Int, Double]]()) { (m, e) =>
        val (userId, itemId, preference) = e
        val values = m.getOrElse(userId, TreeMap[Int, Double]())
        m + (userId -> (values + (itemId -> preference)))

    val userIds = preferences.keySet
    val itemIds = preferences.values.map(x => x.keySet).reduce((x, y) => x union y)
    preferences foreach println

    val recList = userIds.map{ userId =>
        val rs = UserNeighborhoodRecommender.recommender(preferences, itemIds, userIds,
      SimilarityMetrics.pearsonCorrelation, userId)
      println("Recommendations" + userId + ": ")
      rs foreach println
      (userId, rs)

    //TODO 評估推薦效果 recList和testData比較

如果看過我另外一篇文章Spark RDD轉換成其他資料型別就知道這兩個是有關聯的。因為當初我很傻很天真的以為只要是scala在Spark上就會分散式,然而並不是,想要在Spark分散式運算還是老實用RDD吧。而RDD編碼用這種思路是不行的…