1. 程式人生 > >Spark MLlib Logistic Regression邏輯迴歸演算法

Spark MLlib Logistic Regression邏輯迴歸演算法

1.1 邏輯迴歸演算法

1.1.1 基礎理論

logistic迴歸本質上是線性迴歸,只是在特徵到結果的對映中加入了一層函式對映,即先把特徵線性求和,然後使用函式g(z)將最為假設函式來預測。g(z)可以將連續值對映到0和1上。

它與線性迴歸的不同點在於:為了將線性迴歸輸出的很大範圍的數,例如從負無窮到正無窮,壓縮到0和1之間,這樣的輸出值表達為“可能性”才能說服廣大民眾。當然了,把大值壓縮到這個範圍還有個很好的好處,就是可以消除特別冒尖的變數的影響

Logistic函式(或稱為Sigmoid函式),函式形式為:

Sigmoid 函式在有個很漂亮的“S”形,如下圖所示:

給定n個特徵x=(x1,x2,…,xn),設條件概率P(y=1|x)為觀測樣本y相對於事件因素x發生的概率,用sigmoid函式表示為:

那麼在x條件下y不發生的概率為:

假設現在有m個相互獨立的觀測事件y=(y1,y2,…,ym),則一個事件yi發生的概率為(yi= 1)

當y=1的時候,後面那一項是不是沒有了,那就只剩下x屬於1類的概率,當y=0的時候,第一項是不是沒有了,那就只剩下後面那個x屬於0的概率(1減去x屬於1的概率)。所以不管y是0還是1,上面得到的數,都是(x, y)出現的概率。那我們的整個樣本集,也就是n個獨立的樣本出現的似然函式為(因為每個樣本都是獨立的,所以n個樣本出現的概率就是他們各自出現的概率相乘):

然後我們的目標是求出使這一似然函式的值最大的引數估計,最大似然估計就是求出引數,使得

取得最大值,對函式

取對數得到

這時候,用L(θ)θ求導,得到:

1.1.2 梯度下降演算法

θ更新過程: 

θ更新過程可以寫成:

 

向量化Vectorization

Vectorization是使用矩陣計算來代替for迴圈,以簡化計算過程,提高效率。

如上式,Σ(...)是一個求和的過程,顯然需要一個for語句迴圈m次,所以根本沒有完全的實現vectorization。

下面介紹向量化的過程:

約定訓練資料的矩陣形式如下,x的每一行為一條訓練樣本,而每一列為不同的特稱取值:

g(A)的引數A為一列向量,所以實現g函式時要支援列向量作為引數,並返回列向量。由上式可知可由一次計算求得。

θ更新過程可以改為:

綜上所述,Vectorization後θ更新的步驟如下:

(1)求

(2)求

(3)求 

1.1.3 正則化

在實際應該過程中,為了增強模型的泛化能力,防止我們訓練的模型過擬合,特別是對於大量的稀疏特徵,模型複雜度比較高,需要進行降維,我們需要保證在訓練誤差最小化的基礎上,通過加上正則化項減小模型複雜度。在邏輯迴歸中,有L1、L2進行正則化。

損失函式如下:

在損失函式里加入一個正則化項,正則化項就是權重的L1或者L2範數乘以一個,用來控制損失函式和正則化項的比重,直觀的理解,首先防止過擬合的目的就是防止最後訓練出來的模型過分的依賴某一個特徵,當最小化損失函式的時候,某一維度很大,擬合出來的函式值與真實的值之間的差距很小,通過正則化可以使整體的cost變大,從而避免了過分依賴某一維度的結果。當然加正則化的前提是特徵值要進行歸一化。

對於線性迴歸

Regularized Logistic Regression 實際上與 Regularized Linear Regression 是十分相似的。

同樣使用梯度下降:

1.2 Spark Mllib Logistic Regression原始碼分析

1.2.1 LogisticRegressionWithSGD

Logistic迴歸演算法的train方法,由LogisticRegressionWithSGD類的object定義了train函式,在train函式中新建了LogisticRegressionWithSGD物件。

package org.apache.spark.mllib.classification

// 1 類:LogisticRegressionWithSGD

class LogisticRegressionWithSGD private[mllib] (

privatevar stepSize: Double,

privatevar numIterations: Int,

privatevar regParam: Double,

privatevar miniBatchFraction: Double)

extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

privateval gradient = new LogisticGradient()

privateval updater = new SquaredL2Updater()

overrideval optimizer = new GradientDescent(gradient, updater)

.setStepSize(stepSize)

.setNumIterations(numIterations)

.setRegParam(regParam)

.setMiniBatchFraction(miniBatchFraction)

overrideprotectedval validators = List(DataValidators.binaryLabelValidator)

/**

* Construct a LogisticRegression object with default parameters: {stepSize: 1.0,

* numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.

*/

defthis() = this(1.0, 100, 0.01, 1.0)

overrideprotected[mllib] def createModel(weights: Vector, intercept: Double) = {

new LogisticRegressionModel(weights, intercept)

}

LogisticRegressionWithSGD類中引數說明:

stepSize: 迭代步長,預設為1.0

numIterations: 迭代次數,預設為100

regParam: 正則化引數,預設值為0.0

miniBatchFraction: 每次迭代參與計算的樣本比例,預設為1.0

gradient:LogisticGradient(),Logistic梯度下降;

updater:SquaredL2Updater(),正則化,L2範數;

optimizer:GradientDescent(gradient, updater),梯度下降最優化計算。

// 2 train方法

object LogisticRegressionWithSGD {

/**

* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed

* number of iterations of gradient descent using the specified step size. Each iteration uses

* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in

* gradient descent are initialized using the initial weights provided.

* NOTE: Labels used in Logistic Regression should be {0, 1}

*

* @param input RDD of (label, array of features) pairs.

* @param numIterations Number of iterations of gradient descent to run.

* @param stepSize Step size to be used for each iteration of gradient descent.

* @param miniBatchFraction Fraction of data to be used per iteration.

* @param initialWeights Initial set of weights to be used. Array should be equal in size to

*        the number of features in the data.

*/

def train(

input: RDD[LabeledPoint],

numIterations: Int,

stepSize: Double,

miniBatchFraction: Double,

initialWeights: Vector): LogisticRegressionModel = {

new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)

.run(input, initialWeights)

}

}

train引數說明:

input:樣本資料,分類標籤lable只能是1.0和0.0兩種,feature為double型別

numIterations: 迭代次數,預設為100

stepSize: 迭代步長,預設為1.0

miniBatchFraction: 每次迭代參與計算的樣本比例,預設為1.0

initialWeights:初始權重,預設為0向量

run方法來自於繼承父類GeneralizedLinearAlgorithm,實現方法如下。

1.2.2 GeneralizedLinearAlgorithm

LogisticRegressionWithSGD中run方法的實現。

package org.apache.spark.mllib.regression

/**

* Run the algorithm with the configured parameters on an input RDD

* of LabeledPoint entries starting from the initial weights provided.

*/

def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {

// 特徵維度賦值。

if (numFeatures < 0) {

numFeatures = input.map(_.features.size).first()

}

// 輸入樣本資料檢測。

if (input.getStorageLevel == StorageLevel.NONE) {

logWarning("The input data is not directly cached, which may hurt performance if its"

+ " parent RDDs are also uncached.")

}

// 輸入樣本資料檢測。

// Check the data properties before running the optimizer

if (validateData && !validators.forall(func => func(input))) {

thrownew SparkException("Input validation failed.")

}

val scaler = if (useFeatureScaling) {

new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))

} else {

null

}

// 輸入樣本資料處理,輸出data(label, features)格式。

// addIntercept:是否增加θ0常數項,若增加,則增加x0=1項。

// Prepend an extra variable consisting of all 1.0's for the intercept.

// TODO: Apply feature scaling to the weight vector instead of input data.

val data =

if (addIntercept) {

if (useFeatureScaling) {

input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()

} else {

input.map(lp => (lp.label, appendBias(lp.features))).cache()

}

} else {

if (useFeatureScaling) {

input.map(lp => (lp.label, scaler.transform(lp.features))).cache()

} else {

input.map(lp => (lp.label, lp.features))

}

}

//初始化權重。

// addIntercept:是否增加θ0常數項,若增加,則權重增加θ0。

/**

* TODO: For better convergence, in logistic regression, the intercepts should be computed

* from the prior probability distribution of the outcomes; for linear regression,

* the intercept should be set as the average of response.

*/

val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {

appendBias(initialWeights)

} else {

/** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */

initialWeights

}

//權重優化,進行梯度下降學習,返回最優權重。

val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)

val intercept = if (addIntercept && numOfLinearPredictor == 1) {

weightsWithIntercept(weightsWithIntercept.size - 1)

} else {

0.0

}

var weights = if (addIntercept && numOfLinearPredictor == 1) {

    Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))

} else {

weightsWithIntercept

}

createModel(weights, intercept)

}

其中optimizer.optimize(data, initialWeightsWithIntercept)是邏輯迴歸實現的核心。

oprimizer的型別為GradientDescent,optimize方法中主要呼叫GradientDescent伴生物件的runMiniBatchSGD方法,返回當前迭代產生的最優特徵權重向量。

GradientDescentd物件中optimize實現方法如下。

1.2.3 GradientDescent

optimize實現方法如下。

package org.apache.spark.mllib.optimization

/**

* :: DeveloperApi ::

* Runs gradient descent on the given training data.

* @param data training data

* @param initialWeights initial weights

* @return solution vector

*/

@DeveloperApi

def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {

val (weights, _) = GradientDescent.runMiniBatchSGD(

data,

gradient,

updater,

stepSize,

numIterations,

regParam,

miniBatchFraction,

initialWeights)

weights

}

}

在optimize方法中,呼叫了GradientDescent.runMiniBatchSGD方法,其runMiniBatchSGD實現方法如下:

/**

* Run stochastic gradient descent (SGD) in parallel using mini batches.

* In each iteration, we sample a subset (fraction miniBatchFraction) of the total data

* in order to compute a gradient estimate.

* Sampling, and averaging the subgradients over this subset is performed using one standard

* spark map-reduce in each iteration.

*

* @param data - Input data for SGD. RDD of the set of data examples, each of

*               the form (label, [feature values]).

* @param gradient - Gradient object (used to compute the gradient of the loss function of

*                   one single data example)

* @param updater - Updater function to actually perform a gradient step in a given direction.

* @param stepSize - initial step size for the first step

* @param numIterations - number of iterations that SGD should be run.

* @param regParam - regularization parameter

* @param miniBatchFraction - fraction of the input data set that should be used for

*                            one iteration of SGD. Default value 1.0.

*

* @return A tuple containing two elements. The first element is a column matrix containing

*         weights for every feature, and the second element is an array containing the

*         stochastic loss computed for every iteration.

*/

def runMiniBatchSGD(

data: RDD[(Double, Vector)],

gradient: Gradient,

updater: Updater,

stepSize: Double,

numIterations: Int,

regParam: Double,

miniBatchFraction: Double,

initialWeights: Vector): (Vector, Array[Double]) = {

//歷史迭代誤差陣列

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

//樣本資料檢測,若為空,返回初始值。

val numExamples = data.count()

// if no data, return initial weights to avoid NaNs

if (numExamples == 0) {

logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")

return (initialWeights, stochasticLossHistory.toArray)

}

// miniBatchFraction值檢測。

if (numExamples * miniBatchFraction < 1) {

logWarning("The miniBatchFraction is too small")

}

// weights權重初始化。

// Initialize weights as a column vector

var weights = Vectors.dense(initialWeights.toArray)

val n = weights.size

/**

* For the first iteration, the regVal will be initialized as sum of weight squares

* if it's L2 updater; for L1 updater, the same logic is followed.

*/

var regVal = updater.compute(

weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

// weights權重迭代計算。

for (i <- 1 to numIterations) {

val bcWeights = data.context.broadcast(weights)

// Sample a subset (fraction miniBatchFraction) of the total data

// compute and sum up the subgradients on this subset (this is one map-reduce)

// 採用treeAggregate的RDD方法,進行聚合計算,計算每個樣本的權重向量、誤差值,然後對所有樣本權重向量及誤差值進行累加。

// sample是根據miniBatchFraction指定的比例隨機取樣相應數量的樣本 。

val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)

.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(

seqOp = (c, v) => {

// c: (grad, loss, count), v: (label, features)

val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))

(c._1, c._2 + l, c._3 + 1)

},

combOp = (c1, c2) => {

// c: (grad, loss, count)

(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)

})

// 儲存本次迭代誤差值,以及更新weights權重向量。

if (miniBatchSize > 0) {

/**

* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration

* and regVal is the regularization value computed in the previous iteration as well.

*/

// updater.compute更新weights矩陣和regVal(正則化項)。根據本輪迭代中的gradient和loss的變化以及正則化項計算更新之後的weights和regVal。 

stochasticLossHistory.append(lossSum / miniBatchSize + regVal)

val update = updater.compute(

weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)

weights = update._1

regVal = update._2

} else {

logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")

}

}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(

stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}

runMiniBatchSGD的輸入、輸出引數說明:

data 樣本輸入資料,格式 (label, [feature values])

gradient 梯度物件,用於對每個樣本計算梯度及誤差

updater 權重更新物件,用於每次更新權重

stepSize 初始步長

numIterations 迭代次數

regParam 正則化引數

miniBatchFraction 迭代因子,每次迭代參與計算的樣本比例

返回結果(Vector, Array[Double]),第一個為權重,每二個為每次迭代的誤差值。

在MiniBatchSGD中主要實現對輸入資料集進行迭代抽樣,通過使用LogisticGradient作為梯度下降演算法,使用SquaredL2Updater作為更新演算法,不斷對抽樣資料集進行迭代計算從而找出最優的特徵權重向量解。在LinearRegressionWithSGD中定義如下:

privateval gradient = new LogisticGradient()

privateval updater = new SquaredL2Updater()

overrideval optimizer = new GradientDescent(gradient, updater)

.setStepSize(stepSize)

.setNumIterations(numIterations)

.setRegParam(regParam)

.setMiniBatchFraction(miniBatchFraction)

runMiniBatchSGD方法中呼叫了gradient.compute、updater.compute兩個方法,其實現方法如下。

1.2.4 gradient & updater

1)gradient

//計算當前計算物件的類標籤與實際類標籤值之差

//計算當前平方梯度下降值

//計算權重的更新值

//返回當前訓練物件的特徵權重向量和誤差

class LogisticGradient(numClasses: Int) extends Gradient {

defthis() = this(2)

overridedef compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {

val gradient = Vectors.zeros(weights.size)

val loss = compute(data, label, weights, gradient)

(gradient, loss)

}

overridedef compute(

data: Vector,

label: Double,

weights: Vector,

cumGradient: Vector): Double = {

val dataSize = data.size

// (weights.size / dataSize + 1) is number of classes

require(weights.size % dataSize == 0 && numClasses == weights.size / dataSize + 1)

numClasses match {

case2 =>

/**

* For Binary Logistic Regression.

*

* Although the loss and gradient calculation for multinomial one is more generalized,

* and multinomial one can also be used in binary case, we still implement a specialized

* binary version for performance reason.

*/

val margin = -1.0 * dot(data, weights)

val multiplier = (1.0 / (1.0 + math.exp(margin))) - label

axpy(multiplier, data, cumGradient)

if (label > 0) {

// The following is equivalent to log(1 + exp(margin)) but more numerically stable.

MLUtils.log1pExp(margin)

} else {

MLUtils.log1pExp(margin) - margin

}

case _ =>

/**

* For Multinomial Logistic Regression.

*/

val weightsArray = weights match {

case dv: DenseVector => dv.values

case _ =>

thrownew IllegalArgumentException(

s"weights only supports dense vector but got type ${weights.getClass}.")

}

val cumGradientArray = cumGradient match {

case dv: DenseVector => dv.values

case _ =>

thrownew IllegalArgumentException(

s"cumGradient only supports dense vector but got type ${cumGradient.getClass}.")

}

// marginY is margins(label - 1) in the formula.

var marginY = 0.0

var maxMargin = Double.NegativeInfinity

var maxMarginIndex = 0

      val margins = Array.tabulate(numClasses - 1) { i =>

var margin = 0.0

data.foreachActive { (index, value) =>

if (value != 0.0) margin += value * weightsArray((i * dataSize) + index)

}