1. 程式人生 > >Spark MLlib 入門學習筆記

Spark MLlib 入門學習筆記

關於邏輯迴歸的演算法原理 Spark官方文件裡有說明,另外網上也有中文翻譯文件可參考。本筆記是學習MLlib的輯迴歸API使用時一道練習題記錄,通過這道練習,可以掌握基本使用。MLLib提供了兩種演算法實現,分別是SGD梯度下降法和LBFGS。

1. 資料檔案

交通事故的統計檔案,四列,accident(去年是否出過事故,1表示出過事故,0表示沒有),age(年齡 數值型),vision(視力狀況,分型別,1表示好,0表示有問題),drive(駕車教育,分型別,1表示參加過駕車教育,0表示沒有)。第1列是因變數,其它3列是特徵。這是一個用空格分隔的文字檔案,要使用MLLib演算法庫,首先要讀檔案並轉成LabeledPoint資料型別的RDD。

1 17 1 1
1 44 0 0
1 48 1 0
1 55 0 0
1 75 1 1
0 35 0 1
0 42 1 1
0 57 0 0
0 28 0 1
0 20 0 1
0 38 1 0
0 45 0 1
0 47 1 1
0 52 0 0
0 55 0 1
1 68 1 0
1 18 1 0
1 68 0 0
1 48 1 1
1 17 0 0
1 70 1 1
1 72 1 0
1 35 0 1
1 19 1 0
1 62 1 0
0 39 1 1
0 40 1 1
0 55 0 0
0 68 0 1
0 25 1 0
0 17 0 0
0 45 0 1
0 44 0 1
0 67 0 0
0 55 0 1
1 61 1 0
1 19 1 0
1 69 0 0
1 23 1 1
1 19 0 0
1 72 1 1
1 74 1 0
1 31 0 1
1 16 1 0
1 61 1 0
2. SGD演算法
package classify

/*
accident.txt
accident(去年是否出過事故,1表示出過事故,0表示沒有)
age(年齡 數值型)
vision(視力狀況,分型別,1表示好,0表示有問題)
drive(駕車教育,分型別,1表示參加過駕車教育,0表示沒有)
 */
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.{SparkConf, SparkContext}

object LogisticSGD {

  def parseLine(line: String): LabeledPoint = {
    val parts = line.split(" ")
    val vd: Vector = Vectors.dense(parts(1).toDouble, parts(2).toDouble, parts(3).toDouble)
    return LabeledPoint(parts(0).toDouble, vd )
  }


  def main(args: Array[String]){
    val conf = new SparkConf().setMaster(args(0)).setAppName("LogisticSGD")
    val sc = new SparkContext(conf)
    val data =  sc.textFile(args(1)).map(parseLine(_))

    val splits = data.randomSplit(Array(0.6, 0.4), seed=11L)
    val trainData = splits(0)
    val testData = splits(1)

    val model = LogisticRegressionWithSGD.train(trainData, 50)

    println(model.weights.size)
    println(model.weights)
    println(model.weights.toArray.filter(_ != 0).size)

    val predictionAndLabel = testData.map(p => (model.predict(p.features), p.label))

    predictionAndLabel.foreach(println)

  }
}

parseLine函式將文字檔案的每一行轉成一個LabeledPoint資料型別,randomSplit用例把資料集分成訓練和測試兩部分。val model = LogisticRegressionWithSGD.train(trainData, 50) 執行訓練並得到模型,這裡的50為迭代次數。val predictionAndLabel = testData.map(p => (model.predict(p.features), p.label))中的model.predict執行預測,testData.map測試資料集的特徵值傳遞給model去預測,並將預測值與原有的label合併形成一個新的map。

3. LBFGS演算法

package classify

import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint

object LogisticLBFGS {

  def parseLine(line: String): LabeledPoint = {
    val parts = line.split(" ")
    val vd: Vector = Vectors.dense(parts(1).toDouble, parts(2).toDouble, parts(3).toDouble)
    return LabeledPoint(parts(0).toDouble, vd )
  }

  def main(args: Array[String]){
    val conf = new SparkConf().setMaster(args(0)).setAppName("LogisticLBFGS")
    val sc = new SparkContext(conf)
    val data =  sc.textFile(args(1)).map(parseLine(_))

    val splits = data.randomSplit(Array(0.6, 0.4), seed=11L)
    val trainData = splits(0)
    val testData = splits(1)

    val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainData)

    println(model.weights.size)
    println(model.weights)
    println(model.weights.toArray.filter(_ != 0).size)

    val prediction = testData.map(p => (model.predict(p.features), p.label))

    //println(prediction)
    prediction.foreach(println)

  }
}

val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainData)中的setNumClasses(2)設定分類數。

對於這個列子,LBFGS的效果比SGD的效果好。