從examples中學spark(二):ModelSelectionViaCrossValidationExample.scala以及模型儲存
學習前(理論)
最小二乘法、嶺迴歸、Lasso等
學習中(領悟)
1.首先是一段包匯入,跳過
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache .spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
/**
* A simple example demonstrating model selection using CrossValidator.
* This example also demonstrates how Pipelines are Estimators.
*
* Run with
* {{{
* bin/run-example ml.ModelSelectionViaCrossValidationExample
* }}}
*
*/
2.看一下run-example這個指令碼:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
export _SPARK_CMD_USAGE="Usage: ./bin/run-example [options] example-class [example args]"
exec "${SPARK_HOME}"/bin/spark-submit run-example "[email protected]"
- 首先,[ -z “${SPARK_HOME}” ]判斷當前環境有沒有設定SPARK_HOME,(-z判斷後面引數的長度是否為0)。
- dirname取指定路徑所在的目錄,保留最後一個/前面的字元,刪除其他部分,並寫結果到標準輸出。如果最後一個/後無字元,dirname 命令使用倒數第二個/,並忽略其後的所有字元。
- $0表示當前執行的命令名,source “
(dirname" 0”)”/find-spark-home就是執行find-spark-home的指令碼。source不會建立新程序。 - 檢視find-spark-home。就是想方設法export SPARK_HOME
- 然後執行spark-submit指令碼,spark-submit會呼叫spark-class執行org.apache.spark.deploy.SparkSubmit
- spark-class。。。無窮無盡,以後在細學吧。簡單的說它會設定大量的環境變數,以及一些classpath和jvm引數。
3.回到正題:(這段也沒啥可看的)
object ModelSelectionViaCrossValidationExample {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession
.builder
.master("local")
.appName("ModelSelectionViaCrossValidationExample")
.getOrCreate()
// $example on$
// Prepare training data from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0),
(4L, "b spark who", 1.0),
(5L, "g d a y", 0.0),
(6L, "spark fly", 1.0),
(7L, "was mapreduce", 0.0),
(8L, "e spark program", 1.0),
(9L, "a e c l", 0.0),
(10L, "spark compile", 1.0),
(11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")
4.配置ML pipeline, 該流水線包括三個階段: tokenizer(分詞器), hashingTF, lr
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10) // 迭代次數,預設100
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr)) // 函式原型:setStages(value: Array[_ <: PipelineStage]): Pipeline.this.type
5.
- 使用ParamGridBuilder 來構建 a grid of parameters to search over.
- hashingTF.numFeatures設定三個值, lr.regParam設定兩個值
- 這樣一來,對於CrossValidator,就可以評估上面設定的6個引數,也可以認為可以學習6種模型(模型執行是使用“fit”方法,2.0.0版本加入)
- hashingTF:目前使用Austin Appleby的MurmurHash 3演算法————將詞對映為詞頻
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) // Features的數量,預設為2的18次方。
.addGrid(lr.regParam, Array(0.1, 0.01)) // 正則化引數,預設0.0
.build() // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
6.
- 現在將Pipeline視為一個Estimator(進行演算法或管道調-整), 把它包裹進一個CrossValidator例項
這會在所有的Pipeline stages中調整引數 - 所有的模型選擇器都需要:
- 一個Estimator【之前設定的流水線】, 一個Estimator引數集【之前設定的paramGrid】, 一個Evaluator【評估者:衡量擬合模型對延伸測試資料有多好的度量】.
- 這裡的BinaryClassificationEvaluator是預設的(我修改為了RegressionEvaluator)
- 模型評估工具有————迴歸:RegressionEvaluator;二進位制資料:BinaryClassificationEvaluator;多類問題:MulticlassClassificationEvaluator
val cv = new CrossValidator() // 模型選擇工具有: CrossValidator和TrainValidationSplit
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator) // 預設是BinaryClassificationEvaluator
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2) // 生成2個(訓練,測試)資料集對,其中訓練資料佔比2/3,測試資料佔1/3
//詳:【引數值必須>=2,預設3】。資料集對是不重疊的隨機拆分,每對中的測試資料僅測試一次。
//所以資料集大時,我們可以設定得高一些,但是資料集小時,可能會過擬合。TODO:不能整除怎麼辦?
7.
val cvModel = cv.fit(training) // 對於這裡,返回一個CrossValidatorModel
// 準備測試集
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// 使用剛才學習出來的模型對測試集進行預測
cvModel.transform(test) // transform對資料進行轉換,返回一個DataFrame。"probability"和"prediction"
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
spark.stop()
}
}
TODO:返回的DataFrame是如何構造的?(”probability”, “prediction”)
stackoverflow有人解釋說:
RawPrediction通常是直接用概率/置信度計算出來的
prediction是rawPrediction.argmax的結果
Probability是條件概率:
Estimate the probability of each class given the raw prediction,
doing the computation in-place. These predictions are also called class conditional probabilities.
還是比較模糊,我還想知道的是該DataSet是如何構造的。
學習後(實踐):
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* @author 王海[https://github.com/AtTops]
* package main.scala.firstput
* description 這裡沒有結合字串型別(使用StringIndexer);
* 沒有處理分類變數(使用VectorIndexer(向量型別索引化));也沒有使用獨熱編碼OneHotEncoder;
* Date 2018/1/4 20:40
* Version V1.0
*/
object LrPipline {
val myTrainCsvPath: String = ""
val myCVPath: String = ""
val myTestCsvPath: String = ""
val resultsPath: String = ""
/**
* 目前可以使用CrossValidator和TrainValidationSplit這兩種方式調整引數,前者慢後者快,
* 但是後者在資料集小時表現得可能不好(TrainValidationSplit只對一次引數的每個組合進行評估,而在CrossValidator的情況下則為k次。)
*
* @param spark
*/
def piplineWithCrossValidator(spark: SparkSession): Unit = {
var rawTrainDf: DataFrame = spark.read.format("csv").option("header", true).option("inferSchema", true).load(myTrainCsvPath)
rawTrainDf.printSchema()
rawTrainDf = rawTrainDf.withColumnRenamed("sale_quantity", "label").na.drop()
// 將多個特徵變數合併成一個特徵變數,以用於輸入之後的模型
val calArray = Array("sale_date", "class_id", "brand_id", "compartment", "type_id", "department_id", "TR", "displacement", "driven_type_id", "emission_standards_id", "if_MPV_id", "if_luxurious_id", "cylinder_number", "engine_torque", "car_length", "car_height", "total_quality", "rear_track")
val assembler = new VectorAssembler() // 設定哪些是特徵,剩下的是標籤,但是還沒有開始轉換
.setInputCols(calArray)
.setOutputCol("features")
val output = assembler.transform(rawTrainDf)
output.select("features", "label").show
// 初步建立線性迴歸模型
val lr = new LinearRegression()
// .setFeaturesCol("features") // 也可以不這樣做,因為之前的assemble人已經指明瞭哪些是特徵列。後面的setStages(Array(lr))用setStages(Array(assembler,lr))代替
// .setLabelCol("sale_quantity") // 所有數值型別會自動轉為double型(所以不用自己去處理為double型)
// .fit(output)
// 使用ParamGridBuilder構建引數網格
val paramMap = new ParamGridBuilder() // 這些引數可以在LinearRegression類中的Parameters找到
.addGrid(lr.regParam, Array(0.1, 0.01)) // 正則化引數,預設0.0
.addGrid(lr.elasticNetParam, Array(0.1, 0.5, 0.9)) // 給L1、L2正則化的結合版設定3種需要嘗試的引數(預設0.0,僅L2正則)
.addGrid(lr.maxIter, Array(10, 20))
.build()
// 構建管道,把各個階段連線在一起(這裡就一個)
val pipeline = new Pipeline()
.setStages(Array(assembler, lr))
// 模型評估以及選擇
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramMap)
.setNumFolds(4)
val cvModel = cv.fit(rawTrainDf)
// 列印模型相關引數
println("extractParamMap=============================")
val params: ParamMap = cvModel.extractParamMap() // 列印“凍結”的所有引數
params.toSeq.foreach {
print
}// cvModel.getDefault()
println("extractParamMap.length=============================")
println(cvModel.getEstimatorParamMaps.length)
println("foreach=============================")
cvModel.getEstimatorParamMaps.foreach {
println
} // 引數組合的集合
println(cvModel.getEvaluator.isLargerBetter) // 評估的度量值是大的好,還是小的好
// 測試集準備
var rawTestDf: DataFrame = spark.read.format("csv").option("header", true).option("inferSchema", true).load(myTestCsvPath)
rawTestDf = rawTestDf.drop("sale_quantity").na.drop()
// 測試並且列印結果
println("開始測試===================")
val results: DataFrame = cvModel.transform(rawTestDf)
println("列印預測結果===================")
results.select("class_id", "prediction")
.collect()
.foreach { case Row(id: Double, prediction: Double) =>
println(s"$id ----------> prediction=$prediction")
}
val save = results.select("class_id", "prediction") // 不轉換一下的話,報錯說csv資料來源型別錯誤
save.coalesce(1)
.write
.mode("overwrite")
.option("header", true)
.format("csv")
.save(resultsPath)
println(s"預測結果儲存完畢,儲存路徑————————>:$resultsPath")
// 儲存模型
cvModel.write.overwrite().save(myCVPath)
spark.stop()
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local")
.appName("SomeStatistics")
.getOrCreate()
piplineWithCrossValidator(spark)
}
}
預測結果:
儲存的模型:
總結
Spark機器學習中的小框架:
Spark中的LogisticRegression:
1.預設會對資料集進行標準化
TODO:具體採用哪種方法進行標準化,怎樣手動選擇其他標準化方法
def setStandardization(value: Boolean): this.type = set(standardization, value)
setDefault(standardization -> true)
2.如果不設定權重,則將所的權值設定為1.0
TODO:引數怎麼是String型別的?
def setWeightCol(value: String): this.type = set(weightCol, value)
3.平方誤差函式為:
4.支援使用setElasticNetParam設定alpha的值:(預設0.0)
none(最小二乘法)
L2正則化 (嶺迴歸)——alpha=0
L1正則化 (Lasso)——alpha=1
L2 + L1 正則化(elastic net)——alpha=(0,1)
TrainValidationSplit和CrossValidator:
1.TrainValidationSplit僅對引數的每個組合進行一次評估,而在CrossValidator的情況下,是k次(k=getNumFolds)。 因此,它較快,但在訓練資料集不夠大時可能不會產生可靠的結果。
2. CrossValidator使用setNumFolds設定交叉集(預設3);TrainValidationSplit使用setTrainRatio設定訓練集和測試集的比率(無預設值)。
儲存和載入模型:
Please refer to the algorithm’s API documentation to see if saving and loading is supported.
。。。
CrossValidator就找見save沒找見load。。。