1. 程式人生 > >隨機森林迴歸(Random Forest)演算法原理及Spark MLlib呼叫例項(Scala/Java/python)

隨機森林迴歸(Random Forest)演算法原理及Spark MLlib呼叫例項(Scala/Java/python)

隨機森林迴歸

演算法介紹:

       隨機森林是決策樹的整合演算法。隨機森林包含多個決策樹來降低過擬合的風險。隨機森林同樣具有易解釋性、可處理類別特徵、易擴充套件到多分類問題、不需特徵縮放等性質。

隨機森林分別訓練一系列的決策樹,所以訓練過程是並行的。因演算法中加入隨機過程,所以每個決策樹又有少量區別。通過合併每個樹的預測結果來減少預測的方差,提高在測試集上的效能表現。

隨機性體現:
1.
每次迭代時,對原始資料進行二次抽樣來獲得不同的訓練資料。

2.對於每個樹節點,考慮不同的隨機特徵子集來進行分裂。

除此之外,決策時的訓練過程和單獨決策樹訓練過程相同。

對新例項進行預測時,隨機森林需要整合其各個決策樹的預測結果。迴歸和分類問題的整合的方式略有不同。分類問題採取投票制,每個決策樹投票給一個類別,獲得最多投票的類別為最終結果。迴歸問題每個樹得到的預測結果為實數,最終的預測結果為各個樹預測結果的平均值。

        spark.ml支援二分類、多分類以及迴歸的隨機森林演算法,適用於連續特徵以及類別特徵。

引數:

checkpointInterval:

型別:整數型。

含義:設定檢查點間隔(>=1),或不設定檢查點(-1)。

featureSubsetStrategy:

型別:字串型。

含義:每次分裂候選特徵數量。

featuresCol:

型別:字串型。

含義:特徵列名。

impurity:

型別:字串型。

含義:計算資訊增益的準則(不區分大小寫)。

labelCol:

型別:字串型。

含義:標籤列名。

maxBins:

型別:整數型。

含義:連續特徵離散化的最大數量,以及選擇每個節點分裂特徵的方式。

maxDepth:

型別:整數型。

含義:樹的最大深度(>=0)。

minInfoGain:

型別:雙精度型。

含義:分裂節點時所需最小資訊增益。

minInstancesPerNode:

型別:整數型。

含義:分裂後自節點最少包含的例項數量。

numTrees:

型別:整數型。

含義:訓練的樹的數量。

predictionCol:

型別:字串型。

含義:預測結果列名。

seed:

型別:長整型。

含義:隨機種子。

subsamplingRate:

型別:雙精度型。

含義:學習一棵決策樹使用的訓練資料比例,範圍[0,1]

thresholds:

型別:雙精度陣列型。

含義:多分類預測的閥值,以調整預測結果在各個類別的概率。

呼叫示例:

下面的例子匯入LibSVM格式資料,並將之劃分為訓練資料和測試資料。使用第一部分資料進行訓練,剩下資料來測試。訓練之前我們使用了兩種資料預處理方法來對特徵進行轉換,並且添加了元資料到DataFrame

Scala:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, rf))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)

val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println("Learned regression forest model:\n" + rfModel.toDebugString)
Java:
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestRegressor rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures");

// Chain indexer and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {featureIndexer, rf});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
Python:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only