1. 程式人生 > >使用pyspark進行機器學習(分類問題)

使用pyspark進行機器學習(分類問題)

LogisticRegression

class pyspark.ml.classification.LogisticRegression(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", rawPredictionCol="rawPrediction"
, standardization=True, weightCol=None, aggregationDepth=2, family="auto")

引數解釋

regParam: 正則化引數(>=0)
elasticNetParam: ElasticNet混合引數,0-1之間,當alpha為0時,懲罰為L2正則化,當為1時為L1正則化
fitIntercept: 是否擬合一個截距項
Standardization: 是否在擬合數據之前對資料進行標準化
aggregationDepth: 樹聚合所建議的深度(>=2)
Family:標籤分佈簇的名稱,可選:“auto", "binomial", "
multinomial" 引數的getter和setter方法

擬合後的模型擁有的方法和屬性

coefficientMatrix: 模型的係數矩陣(對於multinomial而言)
coefficients: 雙變數logistic迴歸的模型係數,如果是多元Logistic迴歸的話會有異常
evaluate(dataset):在測試集上評估模型
hasSummary: 是否有summary
intercept: 二變數logistic模型的截距
interceptVector: 多變數logistic模型截距
summary:獲得summary
transform(dataset,param=None)
Summary擁有的屬性
predictions:
模型transform方法輸出的預測資料框 probabilityCol: 給出每個類的概率 對於雙變數的summary而言擁有的屬性: areaUnderROC: 計算AUC fMeasureByTreshold: 返回帶有兩個欄位(閾值,F-統計量)的資料框,beta=1.0 pr: 返回精度-召回率兩欄位的資料框 precisionByTreshold:返回帶有閾值,精度兩欄位的資料框,應用了從轉換後資料裡的所有可能概率作為閾值來計算精度 recallByTreshold: 返回帶有閾值,召回率兩欄位的資料框,應用了從轉換後資料裡的所有可能概率作為閾值來計算召回率 roc:返回帶有兩欄位FPR, TPR的資料框,

程式碼

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.classification import LogisticRegression

#雙變數Logistic迴歸
bdf = sc.parallelize([Row(label=1.0,weight=2.0,features=Vectors.dense(1.0)),Row(label=0.0, weight=2.0, features=Vectors.sparse(1,[],[]))]).toDF()
bdf.show()
blor = LogisticRegression(maxIter=5, regParam=0.01,weightCol='weight')
blorModel = blor.fit(bdf)
blorModel.coefficients
blorModel.intercept

#多元Logistic迴歸
mdf = sc.parallelize([Row(label=1.0,weight=2.0, features=Vectors.dense(1.0)),Row(label=0.0,weight=2.0, features=Vectors.sparse(1,[],[])),Row(label=2.,weight=2.0, features=Vectors.dense(3.0))]).toDF()
mlor=LogisticRegression(maxIter=5,regParam=0.01,weightCol='weight',family='multinomial')
mlorModel = mlor.fit(mdf)
print mlorModel.coefficientMatrix
mlorModel.interceptVector

#模型預測
test0=sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
result = blorModel.transform(test0).head()
result.prediction

result.probability
result.rawPrediction

test1 = sc.parallelize([Row(features=Vectors.sparse(1,[0],[1.0]))]).toDF()
blorModel.transform(test1).head().prediction
blorModel.transform(test1).show()
#模型評估
blorModel.summary.roc.show()
blorModel.summary.pr.show()

DecisionTreeClassifier

class pyspark.ml.classification.DecisionTreeClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", seed=None)

引數解釋

checkpointInterval:設定checkpoint區間(>=1),或宕掉checkpoint(-1),例如10意味著緩衝區(cache)將會每迭代10次獲得一次checkpoint
fit(datasset,params=None)
impurity: 資訊增益計算的準則,選項"entropy", "gini"
maxBins: 連續特徵離散化的最大分箱,必須>=2 並且>=分類特徵分類的數量
maxDepth:樹的最大深度
minInfoGain:分割結點所需的最小的資訊增益
minInstancesPerNode:每個結點最小例項個數
引數的getter和setter方法

擬合後的模型擁有的方法和屬性

depth:返回決策樹深
featureImportances:估計每個特徵的重要性,特徵重要性的計算:
    • 特徵J的重要性=通過特徵j所分割結點的資訊增益的和,資訊增益由通過該結點的例項個數進行標準化
    • 整個樹的標準化的重要性和為1
transform(dataset,params=None)方法

程式碼

from pyspark.ml.linalg import Vectors from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0, Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")
model = dt.fit(td)
model.numNodes
#3
 model.depth
#1
model.featureImportances
#SparseVector(1, {0: 1.0})
 model.numFeatures
#1
model.numClasses
#2
print(model.toDebugString)
#DecisionTreeClassificationModel (uid=...) of depth 1 with 3 nodes...
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
result = model.transform(test0).head()
result.prediction
#0.0
result.probability
#DenseVector([1.0, 0.0])
result.rawPrediction
#DenseVector([1.0, 0.0])
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
dtc_path = temp_path + "/dtc"
dt.save(dtc_path)
dt2 = DecisionTreeClassifier.load(dtc_path)
dt2.getMaxDepth()
#2
model_path = temp_path + "/dtc_model"
model.save(model_path)
model2 = DecisionTreeClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True

RandomForestClassifier

class pyspark.ml.classification.RandomForestClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=None, subsamplingRate=1.0)

引數解釋

checkpoint:同DecisionTreeClassifier
featureSubsetStrategy:每棵樹上要分割的特徵數目,選項為"auto","all", "onethird", "sqrt", "log2", "(0.0-1.0],"[1-n]"
fit(dataset,params=None)方法
impurity: 同DecisionTreeClassifier
maxBins:同DecisionTreeClassifier
maxDepth:同DecisionTreeClassifier
minInfoGain: 同DecisionTreeClassifier
numTrees: 訓練樹的個數
subsamplingRate: 用於訓練每顆決策樹的樣本個數,區間(0,1]
引數的getter和setter方法

擬合後的模型擁有的方法和屬性

featureImportances: 同DecisionTreeClassifier,詳見Hastie的《統計學習基礎》
getNumTrees: 樹的個數
transform(dataset, params=None)方法
treeWeights:返回各個樹的權重

程式碼

import numpy
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame([
     (1.0, Vectors.dense(1.0)),
    (0.0, Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model = rf.fit(td)
model.featureImportances

#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 1.0, 1.0])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
result = model.transform(test0).head()
result.prediction
#0.0
numpy.argmax(result.probability)
#0
numpy.argmax(result.rawPrediction)
#0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
model.trees
#[DecisionTreeClassificationModel (uid=...) of depth..., DecisionTreeClassificationModel...]
rfc_path = temp_path + "/rfc"
rf.save(rfc_path)
rf2 = RandomForestClassifier.load(rfc_path)
 rf2.getNumTrees()
#3
model_path = temp_path + "/rfc_model"
model.save(model_path)
model2 = RandomForestClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True

GBTClassifier

class pyspark.ml.classification.GBTClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0)

此分類器實現依據為:
J.H. Friedman. “Stochastic Gradient Boosting.” 1999.

引數解釋

checkpointInterval: 同DecisionTreeClassifier
fit(dataset,params=None)方法
lossType: GBT要最小化的損失函式,選項:logistic
maxBins: 同DecisionTreeClassifier
maxDepth: 同DecisionTreeClassifier
maxIter: 同DecisionTreeClassifier
minInfoGain: 同DecisionTreeClassifier
minInstancesPerNode:同DecisionTreeClassifier
stepSize: 每次迭代優化的步長
subsamplingRate: 同RandomForesetClassier
引數的getter和setter方法

擬合後的模型擁有的方法和屬性

featureImportances: 同DecisionTreeClassifier
getNumTrees:同RandomForesetClassier
totalNumNodes: 結點的總個數,所有樹的加總和
transform(dataset,params=None)方法
treeWeights: 同RandomForesetClassier

程式碼

from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame([(1.0, Vectors.dense(1.0)),(0.0,Vectors.sparse(1, [], []))], ["label", "features"])
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
model.featureImportances
#SparseVector(1, {0: 1.0})
allclose(model.treeWeights, [1.0, 0.1, 0.1, 0.1, 0.1])
#True
test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"])
model.transform(test0).head().prediction
#0.0
test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], ["features"])
model.transform(test1).head().prediction
#1.0
model.totalNumNodes
#15
print(model.toDebugString)
#GBTClassificationModel (uid=...)...with 5 trees...
gbtc_path = temp_path + "gbtc"
gbt.save(gbtc_path)
gbt2 = GBTClassifier.load(gbtc_path)
gbt2.getMaxDepth()
#2
model_path = temp_path + "gbtc_model"
model.save(model_path)
model2 = GBTClassificationModel.load(model_path)
model.featureImportances == model2.featureImportances
#True
model.treeWeights == model2.treeWeights
#True
model.trees
#[DecisionTreeRegressionModel (uid=...) of depth..., DecisionTreeRegressionModel...]

NaiveBayes

class pyspark.ml.classification.NaiveBayes(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", smoothing=1.0, modelType="multinomial", thresholds=None, weightCol=None)

引數解釋

fit(dataset, params=None)方法
modelType: 選項:multinomial和bernulli
smoothing: 平滑引數,應該>=0,預設為1.0
引數的setter和getter方法

擬合後的模型擁有的方法和屬性

numClasses: 類的個數
numFeatures: 訓練模型的特徵個數,如果不知道,返回-1
pi: 類先驗的對數
theta: 類條件概率的對數
transform(dataseet,params=None)方法

程式碼

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([Row(label=0.0, weight=0.1, features=Vectors.dense([0.0, 0.0])),Row(label=0.0, weight=0.5, features=Vectors.dense([0.0, 1.0])),Row(label=1.0, weight=1.0, features=Vectors.dense([1.0, 0.0]))])

nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
model = nb.fit(df)
model.pi
#DenseVector([-0.81..., -0.58...])
model.theta
#DenseMatrix(2, 2, [-0.91..., -0.51..., -0.40..., -1.09...], 1)
test0 = sc.parallelize([Row(features=Vectors.dense([1.0, 0.0]))]).toDF()
result = model.transform(test0).head()
result.prediction
#1.0
result.probability
#DenseVector([0.32..., 0.67...])
result.rawPrediction
#DenseVector([-1.72..., -0.99...])
test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
model.transform(test1).head().prediction
#1.0
nb_path = temp_path + "/nb"
nb.save(nb_path)
nb2 = NaiveBayes.load(nb_path)
nb2.getSmoothing()
#1.0
model_path = temp_path + "/nb_model"
model.save(model_path)
model2 = NaiveBayesModel.load(model_path)
model.pi == model2.pi
#True
model.theta == model2.theta
#True
nb = nb.setThresholds([0.01, 10.00])
model3 = nb.fit(df)
result = model3.transform(test0).head()
result.prediction
#0.0