1. 程式人生 > >Spark機器學習(10):ALS交替最小二乘算法

Spark機器學習(10):ALS交替最小二乘算法

mllib 測試 con 相互 idt color ted 個人 使用

1. Alternating Least Square

ALS(Alternating Least Square),交替最小二乘法。在機器學習中,特指使用最小二乘法的一種協同推薦算法。如下圖所示,u表示用戶,v表示商品,用戶給商品打分,但是並不是每一個用戶都會給每一種商品打分。比如用戶u6就沒有給商品v3打分,需要我們推斷出來,這就是機器學習的任務。

技術分享

由於並不是每個用戶給每種商品都打了分,可以假設ALS矩陣是低秩的,即一個m*n的矩陣,是由m*k和k*n兩個矩陣相乘得到的,其中k<<m,n。

Am×n=Um×k×Vk×n

這種假設是合理的,因為用戶和商品都包含了一些低維度的隱藏特征,比如我們只要知道某個人喜歡碳酸飲料,就可以推斷出他喜歡百世可樂、可口可樂、芬達,而不需要明確指出他喜歡這三種飲料。這裏的碳酸飲料就相當於一個隱藏特征。上面的公式中,Um

×k表示用戶對隱藏特征的偏好,Vk×n表示產品包含隱藏特征的程度。機器學習的任務就是求出Um×k和Vk×n。可知uiTvj是用戶i對商品j的偏好,使用Frobenius範數來量化重構U和V產生的誤差。由於矩陣中很多地方都是空白的,即用戶沒有對商品打分,對於這種情況我們就不用計算未知元了,只計算觀察到的(用戶,商品)集合R。

技術分享

這樣就將協同推薦問題轉換成了一個優化問題。目標函數中U和V相互耦合,這就需要使用交替二乘算法。即先假設U的初始值U(0),這樣就將問題轉化成了一個最小二乘問題,可以根據U(0)可以計算出V(0),再根據V(0)計算出U(1),這樣叠代下去,直到叠代了一定的次數,或者收斂為止。雖然不能保證收斂的全局最優解,但是影響不大。

2. MLlib的ALS實現

MLlib的ALS采用了數據分區結構,即將U分解成u1,u2,u3,...um,V分解成v1,v2,v3,...vn,相關的u和v存放在同一個分區,從而減少分區間數據交換的成本。比如通過U計算V時,存儲u的分區是P1,P2...,存儲v的分區是Q1,Q2...,需要將不同的u發送給不同的Q,存放這個關系的塊稱作OutBlock;在P中,計算v時需要哪些u,存放這個關系的塊稱作InBlock。

比如R中有a12,a13,a15,u1存放在P1,v2,v3存放在Q2,v5存放在Q3,則需要將P1中的u1發送給Q2和Q3,這個信息存儲在OutBlock;R中有a12

,a32,因此計算v2需要u1和u3,這個信息存儲在InBlock。

直接上代碼:

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

/**
  * Created by Administrator on 2017/7/19.
  */
object ALSTest01 {

  def main(args:Array[String]) ={
    // 設置運行環境
    val conf = new SparkConf().setAppName("ALS 01")
      .setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\MachineLearning\\MachineLearning.jar"))
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)

    // 讀取樣本數據並解析
    val dataRDD = sc.textFile("hdfs://master:9000/ml/data/test.data")
    val ratingRDD = dataRDD.map(_.split(‘,‘) match {
      case Array(user, item, rate) =>
        Rating(user.toInt, item.toInt, rate.toDouble)
    })

    // 拆分成訓練集和測試集
    val dataParts = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainingRDD = dataParts(0)
    val testRDD = dataParts(1)

    // 建立ALS交替最小二乘算法模型並訓練
    val rank = 10
    val numIterations = 10
    val alsModel = ALS.train(trainingRDD, rank, numIterations, 0.01)

    // 預測
    val user_product = trainingRDD.map {
      case Rating(user, product, rate) =>
        (user, product)
    }
    val predictions =
      alsModel.predict(user_product).map {
        case Rating(user, product, rate) =>
          ((user, product), rate)
      }

    val ratesAndPredictions = trainingRDD.map {
      case Rating(user, product, rate) =>
        ((user, product), rate)
    }.join(predictions)

    val MSE = ratesAndPredictions.map {
      case ((user, product), (r1, r2)) =>
        val err = (r1 - r2)
        err * err
    }.mean()

    println("Mean Squared Error = " + MSE)

    println("User" + "\t" + "Products" + "\t" + "Rate" + "\t" + "Prediction")
    ratesAndPredictions.collect.foreach(
      rating => {
        println(rating._1._1 + "\t" + rating._1._2 + "\t" + rating._2._1 + "\t" + rating._2._2)
      }
    )

  }

}

其中ALS.train()函數的4個參數分別是訓練用的數據集,特征數量,叠代次數,和正則因子。

運行結果:

技術分享

可見,預測結果還是非常準確的。

Spark機器學習(10):ALS交替最小二乘算法