1. 程式人生 > >Spark MLlib logistic迴歸案例

Spark MLlib logistic迴歸案例

目的:

基於spark1.5.1平臺實現logistics regression 演算法。

logistics迴歸介紹:連結,連結2

程式碼:

package sparklr2

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext

object SparkLR2 {

  def main(args: Array[String]): Unit = {
    // Prepare training data from a list of (label, features) tuples.
    val sc = new SparkContext("local","sparkSQL")//設定了local模式和appname sparksql
    val sqlContext=new SQLContext(sc)
    //Load TrainData
    //Seq型別是有順序的資料結構
    //training為dataFrame
    val training = sqlContext.createDataFrame(Seq(
      (1.0, Vectors.dense(0.0, 1.1, 0.1)),
      (0.0, Vectors.dense(2.0, 1.0, -1.0)),
      (0.0, Vectors.dense(2.0, 1.3, 1.0)),
      (1.0, Vectors.dense(0.0, 1.2, -0.5))
    )).toDF("label", "features")//轉變為資料框
    //檢視train Data
    println("training data")
    training.foreach(println)
    //[1.0,[0.0,1.1,0.1]]
    //[0.0,[2.0,1.0,-1.0]]

    // 建立logistics regression例項
    val lr = new LogisticRegression()
    // Print out the parameters, documentation, and any default values.
    println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

    //重新設定模型引數方法
    lr.setMaxIter(10)//最大迭代步數
      .setRegParam(0.01)

    // 根據設定的模型引數與training data擬合訓練得到模型
    val model1 = lr.fit(training)
    // Since model1 is a Model (i.e., a Transformer produced by an Estimator),
    // we can view the parameters it used during fit().
    // This prints the parameter (name: value) pairs, where names are unique IDs for this
    println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

    val testData = sqlContext.createDataFrame(Seq(
      (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
      (0.0, Vectors.dense(3.0, 2.0, -0.1)),
      (1.0, Vectors.dense(0.0, 2.2, -1.5)),
      (1.0, Vectors.dense(2.0, 2.2, -1.0))
    )).toDF("label", "features")

    model1.transform(testData)
      .select("features", "label", "probability", "prediction")//選擇資料框的某些列
      .collect()//一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組
      .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
        println(s"($features, $label) -> prob=$prob, prediction=$prediction")
      }

    // We may alternatively specify parameters using a ParamMap,
    // which supports several methods for specifying parameters.
    val paramMap = ParamMap(lr.maxIter -> 20)
             .put(lr.maxIter, 30) // Specify 1 Param.  This overwrites the original maxIter.
             .put(lr.regParam -> 0.1, lr.threshold -> 0.9) // Specify multiple Params.

    // One can also combine ParamMaps.
    val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // 將輸出結果列中Change output column name
    val paramMapCombined = paramMap ++ paramMap2//兩個引數設定的map合併,兩個map的合併.

    // Now learn a new model using the paramMapCombined parameters.
    // paramMapCombined overrides all parameters set earlier via lr.set* methods.
    val model2 = lr.fit(training, paramMapCombined)
    println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

    // Make predictions on test data using the Transformer.transform() method.
    // LogisticRegression.transform will only use the 'features' column.
    // Note that model2.transform() outputs a 'myProbability' column instead of the usual//輸出概率列
    // 'probability' column since we renamed the lr.probabilityCol parameter previously.
    model2.transform(testData)//transform為得到一個新rdd
      .select("features", "label", "myProbability", "prediction")
      .collect()//返回所有selected資料 in an array
      .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
         println(s"($features, $label) -> prob=$prob, prediction=$prediction")//自定義輸出資料
      }

    sc.stop()

  }

}

結果:

模型1結果


模型2結果