1. 程式人生 > >從examples中學spark(二):ModelSelectionViaCrossValidationExample.scala以及模型儲存

從examples中學spark(二):ModelSelectionViaCrossValidationExample.scala以及模型儲存

學習前(理論)

最小二乘法、嶺迴歸、Lasso等

學習中(領悟)

1.首先是一段包匯入,跳過

import org.apache.log4j.{Level, Logger}

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache
.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession /** * A simple example demonstrating model selection using CrossValidator. * This example also demonstrates how Pipelines are Estimators. * * Run with * {{{ * bin/run-example ml.ModelSelectionViaCrossValidationExample * }}} * */

2.看一下run-example這個指令碼:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

export _SPARK_CMD_USAGE="Usage: ./bin/run-example [options] example-class [example args]"
exec "${SPARK_HOME}"/bin/spark-submit run-example "[email protected]"
  • 首先,[ -z “${SPARK_HOME}” ]判斷當前環境有沒有設定SPARK_HOME,(-z判斷後面引數的長度是否為0)。
  • dirname取指定路徑所在的目錄,保留最後一個/前面的字元,刪除其他部分,並寫結果到標準輸出。如果最後一個/後無字元,dirname 命令使用倒數第二個/,並忽略其後的所有字元。
  • $0表示當前執行的命令名,source “(dirname"0”)”/find-spark-home就是執行find-spark-home的指令碼。source不會建立新程序。
  • 檢視find-spark-home。就是想方設法export SPARK_HOME
  • 然後執行spark-submit指令碼,spark-submit會呼叫spark-class執行org.apache.spark.deploy.SparkSubmit
  • spark-class。。。無窮無盡,以後在細學吧。簡單的說它會設定大量的環境變數,以及一些classpath和jvm引數。

3.回到正題:(這段也沒啥可看的)

object ModelSelectionViaCrossValidationExample {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession
      .builder
      .master("local")
      .appName("ModelSelectionViaCrossValidationExample")
      .getOrCreate()

    // $example on$
    // Prepare training data from a list of (id, text, label) tuples.
    val training = spark.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0),
      (4L, "b spark who", 1.0),
      (5L, "g d a y", 0.0),
      (6L, "spark fly", 1.0),
      (7L, "was mapreduce", 0.0),
      (8L, "e spark program", 1.0),
      (9L, "a e c l", 0.0),
      (10L, "spark compile", 1.0),
      (11L, "hadoop software", 0.0)
    )).toDF("id", "text", "label")

4.配置ML pipeline, 該流水線包括三個階段: tokenizer(分詞器), hashingTF, lr

    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10) // 迭代次數,預設100
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr)) // 函式原型:setStages(value: Array[_ <: PipelineStage]): Pipeline.this.type

5.

  • 使用ParamGridBuilder 來構建 a grid of parameters to search over.
  • hashingTF.numFeatures設定三個值, lr.regParam設定兩個值
  • 這樣一來,對於CrossValidator,就可以評估上面設定的6個引數,也可以認為可以學習6種模型(模型執行是使用“fit”方法,2.0.0版本加入)
  • hashingTF:目前使用Austin Appleby的MurmurHash 3演算法————將詞對映為詞頻
val paramGrid = new ParamGridBuilder()
      .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) // Features的數量,預設為2的18次方。
      .addGrid(lr.regParam, Array(0.1, 0.01)) // 正則化引數,預設0.0
      .build() // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.

6.

  • 現在將Pipeline視為一個Estimator(進行演算法或管道調-整), 把它包裹進一個CrossValidator例項
    這會在所有的Pipeline stages中調整引數
  • 所有的模型選擇器都需要:
  • 一個Estimator【之前設定的流水線】, 一個Estimator引數集【之前設定的paramGrid】, 一個Evaluator【評估者:衡量擬合模型對延伸測試資料有多好的度量】.
  • 這裡的BinaryClassificationEvaluator是預設的(我修改為了RegressionEvaluator)
  • 模型評估工具有————迴歸:RegressionEvaluator;二進位制資料:BinaryClassificationEvaluator;多類問題:MulticlassClassificationEvaluator
val cv = new CrossValidator() // 模型選擇工具有: CrossValidator和TrainValidationSplit
      .setEstimator(pipeline)
      .setEvaluator(new RegressionEvaluator) // 預設是BinaryClassificationEvaluator
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(2)  // 生成2個(訓練,測試)資料集對,其中訓練資料佔比2/3,測試資料佔1/3
      //詳:【引數值必須>=2,預設3】。資料集對是不重疊的隨機拆分,每對中的測試資料僅測試一次。
      //所以資料集大時,我們可以設定得高一些,但是資料集小時,可能會過擬合。TODO:不能整除怎麼辦?

7.

    val cvModel = cv.fit(training) // 對於這裡,返回一個CrossValidatorModel

    // 準備測試集
    val test = spark.createDataFrame(Seq(
      (4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "mapreduce spark"),
      (7L, "apache hadoop")
    )).toDF("id", "text")

    // 使用剛才學習出來的模型對測試集進行預測
    cvModel.transform(test) // transform對資料進行轉換,返回一個DataFrame。"probability""prediction"
      .select("id", "text", "probability", "prediction")
      .collect()
      .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      }
    spark.stop()
  }
}

TODO:返回的DataFrame是如何構造的?(”probability”, “prediction”)
stackoverflow有人解釋說:
RawPrediction通常是直接用概率/置信度計算出來的
prediction是rawPrediction.argmax的結果
Probability是條件概率:

Estimate the probability of each class given the raw prediction,
doing the computation in-place. These predictions are also called class conditional probabilities.

還是比較模糊,我還想知道的是該DataSet是如何構造的。

學習後(實踐):

import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * @author 王海[https://github.com/AtTops]
  *         package main.scala.firstput
  *         description 這裡沒有結合字串型別(使用StringIndexer);
  *         沒有處理分類變數(使用VectorIndexer(向量型別索引化));也沒有使用獨熱編碼OneHotEncoder;
  *         Date 2018/1/4 20:40
  *         Version V1.0
  */
object LrPipline {
  val myTrainCsvPath: String = ""
  val myCVPath: String = ""
  val myTestCsvPath: String = ""
  val resultsPath: String = ""

  /**
    * 目前可以使用CrossValidator和TrainValidationSplit這兩種方式調整引數,前者慢後者快,
    * 但是後者在資料集小時表現得可能不好(TrainValidationSplit只對一次引數的每個組合進行評估,而在CrossValidator的情況下則為k次。)
    *
    * @param spark
    */
  def piplineWithCrossValidator(spark: SparkSession): Unit = {
    var rawTrainDf: DataFrame = spark.read.format("csv").option("header", true).option("inferSchema", true).load(myTrainCsvPath)
    rawTrainDf.printSchema()
    rawTrainDf = rawTrainDf.withColumnRenamed("sale_quantity", "label").na.drop()

    // 將多個特徵變數合併成一個特徵變數,以用於輸入之後的模型
    val calArray = Array("sale_date", "class_id", "brand_id", "compartment", "type_id", "department_id", "TR", "displacement", "driven_type_id", "emission_standards_id", "if_MPV_id", "if_luxurious_id", "cylinder_number", "engine_torque", "car_length", "car_height", "total_quality", "rear_track")
    val assembler = new VectorAssembler() // 設定哪些是特徵,剩下的是標籤,但是還沒有開始轉換
      .setInputCols(calArray)
      .setOutputCol("features")
    val output = assembler.transform(rawTrainDf)
    output.select("features", "label").show

    // 初步建立線性迴歸模型
    val lr = new LinearRegression()
    //      .setFeaturesCol("features") // 也可以不這樣做,因為之前的assemble人已經指明瞭哪些是特徵列。後面的setStages(Array(lr))用setStages(Array(assembler,lr))代替
    //      .setLabelCol("sale_quantity") // 所有數值型別會自動轉為double型(所以不用自己去處理為double型)
    //      .fit(output)

    // 使用ParamGridBuilder構建引數網格
    val paramMap = new ParamGridBuilder() // 這些引數可以在LinearRegression類中的Parameters找到
      .addGrid(lr.regParam, Array(0.1, 0.01)) // 正則化引數,預設0.0
      .addGrid(lr.elasticNetParam, Array(0.1, 0.5, 0.9)) // 給L1、L2正則化的結合版設定3種需要嘗試的引數(預設0.0,僅L2正則)
      .addGrid(lr.maxIter, Array(10, 20))
      .build()

    // 構建管道,把各個階段連線在一起(這裡就一個)
    val pipeline = new Pipeline()
      .setStages(Array(assembler, lr))

    // 模型評估以及選擇
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(new RegressionEvaluator)
      .setEstimatorParamMaps(paramMap)
      .setNumFolds(4)

    val cvModel = cv.fit(rawTrainDf)
    // 列印模型相關引數
    println("extractParamMap=============================")
    val params: ParamMap = cvModel.extractParamMap() // 列印“凍結”的所有引數
    params.toSeq.foreach {
      print
    }// cvModel.getDefault()
    println("extractParamMap.length=============================")
    println(cvModel.getEstimatorParamMaps.length)
    println("foreach=============================")
    cvModel.getEstimatorParamMaps.foreach {
      println
    } // 引數組合的集合
    println(cvModel.getEvaluator.isLargerBetter) // 評估的度量值是大的好,還是小的好

    // 測試集準備
    var rawTestDf: DataFrame = spark.read.format("csv").option("header", true).option("inferSchema", true).load(myTestCsvPath)
    rawTestDf = rawTestDf.drop("sale_quantity").na.drop()
    // 測試並且列印結果
    println("開始測試===================")
    val results: DataFrame = cvModel.transform(rawTestDf)
    println("列印預測結果===================")
    results.select("class_id", "prediction")
      .collect()
      .foreach { case Row(id: Double, prediction: Double) =>
        println(s"$id ----------> prediction=$prediction")
      }

    val save = results.select("class_id", "prediction") // 不轉換一下的話,報錯說csv資料來源型別錯誤
    save.coalesce(1)
      .write
      .mode("overwrite")
      .option("header", true)
      .format("csv")
      .save(resultsPath)
    println(s"預測結果儲存完畢,儲存路徑————————>:$resultsPath")

    // 儲存模型
    cvModel.write.overwrite().save(myCVPath)

    spark.stop()
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .master("local")
      .appName("SomeStatistics")
      .getOrCreate()

    piplineWithCrossValidator(spark)
  }

}

預測結果:
這裡寫圖片描述
儲存的模型:
這裡寫圖片描述

總結

Spark機器學習中的小框架:

Spark中的LogisticRegression:
1.預設會對資料集進行標準化
TODO:具體採用哪種方法進行標準化,怎樣手動選擇其他標準化方法

 def setStandardization(value: Boolean): this.type = set(standardization, value)
  setDefault(standardization -> true)

2.如果不設定權重,則將所的權值設定為1.0
TODO:引數怎麼是String型別的?

def setWeightCol(value: String): this.type = set(weightCol, value)

3.平方誤差函式為:
這裡寫圖片描述
4.支援使用setElasticNetParam設定alpha的值:(預設0.0)
none(最小二乘法)
L2正則化 (嶺迴歸)——alpha=0
L1正則化 (Lasso)——alpha=1
L2 + L1 正則化(elastic net)——alpha=(0,1)

TrainValidationSplit和CrossValidator:
1.TrainValidationSplit僅對引數的每個組合進行一次評估,而在CrossValidator的情況下,是k次(k=getNumFolds)。 因此,它較快,但在訓練資料集不夠大時可能不會產生可靠的結果。
2. CrossValidator使用setNumFolds設定交叉集(預設3);TrainValidationSplit使用setTrainRatio設定訓練集和測試集的比率(無預設值)。

儲存和載入模型:

Please refer to the algorithm’s API documentation to see if saving and loading is supported.
。。。

CrossValidator就找見save沒找見load。。。

參考