1. 程式人生 > >spark 2.x 原始碼分析 之 Logistic Regression 邏輯迴歸

spark 2.x 原始碼分析 之 Logistic Regression 邏輯迴歸

Logistic Regression 邏輯迴歸

注:第一次寫部落格,希望互相交流改進。如果公式顯示不完整,請看github原文

一、二元邏輯迴歸

    1、簡介

    迴歸是解決變數之間的對映關係(x->y),而邏輯迴歸則通過sigmoid函式將對映值限定在(0,1)。sigmoid影象如下:


    假設特徵是x,線性函式可以表示為:

    而邏輯迴歸則是在其基礎上套上一個sigmoid函式:

    因此邏輯迴歸屬於線性函式,具有線性決策邊界(面):

    2、構造損失函式

    對於二分類,分類結果只有兩種:y=1 or y=0,其概率分別為


    因此最大似然估計為:
    對上述公式取log方便計算,同時為了將likelihood最大化問題轉化為最小化問題,對上述公式取-log得到損失函式:









    考慮一個樣本比較方便,spark中也是這樣做的,針對樣本i,loss對於引數j的一階gradient為:



    tips:以上margin和multiplier和spark原始碼中的變數一致。

    3、正則項-過擬合

    為了減少過擬合,在損失函式中加入正則項,其目的是對引數進行限制,與資料無關。


    常見的正則化手段:L1和L2。L1由於並非處處可導,因此求解需要專門的方法例如OWLQN





    2、最優化

    lr.regression採用了L-BFGS(L2)和OWLQN(L1),分別針對L2和L1正則化,可參考
    部落格

二、多元邏輯迴歸

    1、softmax概率

    spark2中multinomial邏輯迴歸採用的是softmax(與spark1.6不完全一致),可參考ufldl(http://ufldl.stanford.edu/tutorial/supervised/SoftmaxRegression/),類別概率定義為:

    二元邏輯迴歸中權重為向量,多元邏輯迴歸中權重beta為矩陣,相當於多個二元邏輯迴歸(每個類別/每行):


    上述模型中的引數可以任意伸縮,即對於任意常數值,都可以被加到所有引數,而每個類別的概率值不發生變化:


2、損失函式及其導數

對於資料中的一個例項instance,損失函式為: 其中,

不論SGD,LBFGS還是OWLQN最優化,都需要計算損失函式對引數的一階導數: 其中,w_i是樣本權重(暫時忽略不管), 而 I_{y=k}:因為對第k個類別的引數beta_k求導,因此只有當前樣本的y=k,損失函式的最後一項才計算:
上述公式中,當max(margin)>0時會導致運算溢位,因此需要一些調整,首先損失函式等價變換: 進而,multiplier則變成:

三.例項



import org.apache.spark.ml.classification.LogisticRegression
// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// We can also use the multinomial family for binary classification
val mlr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
  .setFamily("multinomial")
val mlrModel = mlr.fit(training)
// Print the coefficients and intercepts for logistic regression with multinomial family
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
//多分類與上述類似,

四.程式碼分析

4.1 整體流程

邏輯迴歸(mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala)的主要程式碼體現在run函式的 `val (coefficientMatrix, interceptVector, objectiveHistory) = {}` 程式碼塊中。其中前部分初始化引數和計算summary(feature的均值和標準差等),之後則是關鍵部分: **損失函式costFun和最優化方法optimizer**: 如果不使用L1正則化,則採用LBFGS優化,否則利用OWLQN演算法優化(因為L1不保證處處可導),兩者都屬於擬牛頓法
    val regParamL1 = $(elasticNetParam) * $(regParam)
    val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

    val bcFeaturesStd = instances.context.broadcast(featuresStd)
    
    #損失函式後面定義,其中包括梯度計算。必須定義,breeze/optimize中(LBFGS和OWLQN)會用到
    val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
      $(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial,
      $(aggregationDepth))

    val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
      new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
    } else {
      val standardizationParam = $(standardization)
      def regParamL1Fun = (index: Int) => {
        // Remove the L1 penalization on the intercept
        val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets
        if (isIntercept) {
          0.0
        } else {
          if (standardizationParam) {
            regParamL1
          } else {
            val featureIndex = index / numCoefficientSets
            // If `standardization` is false, we still standardize the data
            // to improve the rate of convergence; as a result, we have to
            // perform this reverse standardization by penalizing each component
            // differently to get effectively the same objective function when
            // the training dataset is not standardized.
            if (featuresStd(featureIndex) != 0.0) {
              regParamL1 / featuresStd(featureIndex)
            } else {
              0.0
            }
          }
        }
      }
      new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
    }
    
    #此處省略 初始化等操作#
    
    #該LogisticRegression類繼承了FirstOrderMinimizer,因此使用FirstOrderMinimizer類中的iterations方法
    val states = optimizer.iterations(new CachedDiffFunction(costFun),
      new BDV[Double](initialCoefWithInterceptMatrix.toArray))

    /*
       Note that in Logistic Regression, the objective history (loss + regularization)
       is log-likelihood which is invariant under feature standardization. As a result,
       the objective history from optimizer is the same as the one in the original space.
     */
    val arrayBuilder = mutable.ArrayBuilder.make[Double]
    var state: optimizer.State = null
    #更新到最後一個迭代為最終值
    while (states.hasNext) {
      state = states.next()
      arrayBuilder += state.adjustedValue
    }
    bcFeaturesStd.destroy(blocking = false)
其中states表示狀態迭代器,每個迭代進行更新,state類在breeze/optimize/FirstOrderMinimizer.scala中,包括x模型引數、value模型loss、grad梯度等:
case class State[+T, +ConvergenceInfo, +History](
   x: T,
  value: Double,
  grad: T,
  adjustedValue: Double,
  adjustedGradient: T,
  iter: Int,
  initialAdjVal: Double,
  history: History,
  convergenceInfo: ConvergenceInfo,
  searchFailed: Boolean = false,
  var convergenceReason: Option[ConvergenceReason] = None) {}

4.2 損失函式類 LogisticCostFun

*作用*:在FirstOrderMinimizer的iterations中更新states時使用calculateObjective方法,其中呼叫DiffFunction.calculate。而LogisticCostFun則繼承DiffFunction並重寫calculate方法:計算loss 和 gradient with L2 regularization

/**
 * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial (softmax) logistic loss
 * function, as used in multi-class classification (it is also used in binary logistic regression).
 * It returns the loss and gradient with L2 regularization at a particular point (coefficients).
 * It's used in Breeze's convex optimization routines.
 */
private class LogisticCostFun(
    instances: RDD[Instance],
    numClasses: Int,
    fitIntercept: Boolean,
    standardization: Boolean,
    bcFeaturesStd: Broadcast[Array[Double]],
    regParamL2: Double,
    multinomial: Boolean,
    aggregationDepth: Int) extends DiffFunction[BDV[Double]] {

  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
    val coeffs = Vectors.fromBreeze(coefficients)
    val bcCoeffs = instances.context.broadcast(coeffs)
    val featuresStd = bcFeaturesStd.value
    val numFeatures = featuresStd.length
    val numCoefficientSets = if (multinomial) numClasses else 1
    val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures

    #利用logisticAggregator類分別計算loss和gradient,之後treeAggregate進行add和merge
    val logisticAggregator = {
      val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
      val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)

      instances.treeAggregate(
        new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept,
          multinomial)
      )(seqOp, combOp, aggregationDepth)
    }

    val totalGradientMatrix = logisticAggregator.gradient
    val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray)
    // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
    val regVal = if (regParamL2 == 0.0) {
      0.0
    } else {
      var sum = 0.0
      coefMatrix.foreachActive { case (classIndex, featureIndex, value) =>
        // We do not apply regularization to the intercepts
        val isIntercept = fitIntercept && (featureIndex == numFeatures)
        #將L2正則項加入loss和相應梯度
        if (!isIntercept) {
          // The following code will compute the loss of the regularization; also
          // the gradient of the regularization, and add back to totalGradientArray.
          sum += {
            if (standardization) {
              val gradValue = totalGradientMatrix(classIndex, featureIndex)
              totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value)
              value * value
            } else {
              if (featuresStd(featureIndex) != 0.0) {
                // If `standardization` is false, we still standardize the data
                // to improve the rate of convergence; as a result, we have to
                // perform this reverse standardization by penalizing each component
                // differently to get effectively the same objective function when
                // the training dataset is not standardized.
                val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex))
                val gradValue = totalGradientMatrix(classIndex, featureIndex)
                totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp)
                value * temp
              } else {
                0.0
              }
            }
          }
        }
      }
      0.5 * regParamL2 * sum
    }
    bcCoeffs.destroy(blocking = false)

    (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray))
  }


上述程式碼主要功能:I. 計算loss和gradient並且合併;II. 計算L2正則項加入loss和相應gradient;III. 對資料進行standardization。
  1. loss和gradient 損失和梯度的計算在LogisticAggregator類中,包括binaryUpdateInPlace和multinomialUpdateInPlace,下一節會詳細介紹。
  2. L2正則項 這裡要注意兩點:(1)loss和gradient中都要加入相應L2;(2)Intercept不需要正則項
  3. standardization 即便我們沒有選擇standardization,整個計算過程中還是會standardize資料,包括計算LogisticAggregator中計算loss、gradient,這樣有利於模型收斂。因此這裡需要reverse(有點confused,待後續研究)。

4.3 LogisticAggregator

該類中包含gradient和loss的計算,以及不同LogisticAggregator之間的合併。而gradient和loss計算又包括兩部分二元和多元:binaryUpdateInPlace和multinomialUpdateInPlace
  1. binaryUpdateInPlace:binary邏輯迴歸


/** Update gradient and loss using binary loss function. */
private def binaryUpdateInPlace(
   features: Vector,
   weight: Double,
   label: Double): Unit = {

 val localFeaturesStd = bcFeaturesStd.value
 val localCoefficients = bcCoefficients.value
 val localGradientArray = gradientSumArray
 //margin即為公式中的margin
 val margin = - {
   var sum = 0.0
   features.foreachActive { (index, value) =>
     if (localFeaturesStd(index) != 0.0 && value != 0.0) {
       //除以localFeaturesStd進行standardization
       sum += localCoefficients(index) * value / localFeaturesStd(index)
     }
   }
   if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
   sum
 }
 //同公式中的multiplier
 val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
 //梯度更新:同樣standardization
 features.foreachActive { (index, value) =>
   if (localFeaturesStd(index) != 0.0 && value != 0.0) {
     localGradientArray(index) += multiplier * value / localFeaturesStd(index)
   }
 }

 if (fitIntercept) {
   localGradientArray(numFeaturesPlusIntercept - 1) += multiplier
 }

 if (label > 0) {
   // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
   lossSum += weight * MLUtils.log1pExp(margin)
 } else {
   lossSum += weight * (MLUtils.log1pExp(margin) - margin)
 }
}

  • multinomialUpdateInPlace:softmax邏輯迴歸
    1. 首先計算margin以及maxMargin,如公式所述。其中值得注意的是資料進行standardization,以及是否有截距。

    
    /** Update gradient and loss using multinomial (softmax) loss function. */
    private def multinomialUpdateInPlace(
       features: Vector,
       weight: Double,
       label: Double): Unit = {
     // TODO: use level 2 BLAS operations
     /*
       Note: this can still be used when numClasses = 2 for binary
       logistic regression without pivoting.
      */
     val localFeaturesStd = bcFeaturesStd.value
     val localCoefficients = bcCoefficients.value
     val localGradientArray = gradientSumArray
    
     // marginOfLabel is margins(label) in the formula
     var marginOfLabel = 0.0
     var maxMargin = Double.NegativeInfinity
     //計算margin,如公式所述
     val margins = new Array[Double](numClasses)
     features.foreachActive { (index, value) =>
       //資料進行standardization
       val stdValue = value / localFeaturesStd(index)
       var j = 0
       while (j < numClasses) {
         margins(j) += localCoefficients(index * numClasses + j) * stdValue
         j += 1
       }
     }
     var i = 0
     while (i < numClasses) {
       if (fitIntercept) {
         margins(i) += localCoefficients(numClasses * numFeatures + i)
       }
       if (i == label.toInt) marginOfLabel = margins(i)
       //計算maxMargin
       if (margins(i) > maxMargin) {
         maxMargin = margins(i)
       }
       i += 1
     }  
    

  • multiplier,gradient,loss 下面計算multiplier,gradient和loss:其中margin統一減去maxMargin以免計算爆炸,同時資料要進行standardization。
  • 其中margin和multiplier是陣列,維度取決於類別數,即對於每個類別k來說就是一個浮點數;而localGradientArray則是一個矩陣(這裡將矩陣平鋪成陣列)。

    tips:可以看成K個binary迴歸,分別計算margin,multiplier和gradient;一條樣本,同時計算所有引數梯度localGradientArray。

    
     /**
      * When maxMargin is greater than 0, the original formula could cause overflow.
      * We address this by subtracting maxMargin from all the margins, so it's guaranteed
      * that all of the new margins will be smaller than zero to prevent arithmetic overflow.
      */
     val multipliers = new Array[Double](numClasses)
     val sum = {
       var temp = 0.0
       var i = 0
       while (i < numClasses) {
         if (maxMargin > 0) margins(i) -= maxMargin
         val exp = math.exp(margins(i))
         temp += exp
         multipliers(i) = exp
         i += 1
       }
       temp
     }
    
     margins.indices.foreach { i =>
       multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
     }
     features.foreachActive { (index, value) =>
       if (localFeaturesStd(index) != 0.0 && value != 0.0) {
         val stdValue = value / localFeaturesStd(index)
         var j = 0
         while (j < numClasses) {
           localGradientArray(index * numClasses + j) +=
             weight * multipliers(j) * stdValue
           j += 1
         }
       }
     }
     if (fitIntercept) {
       var i = 0
       while (i < numClasses) {
         localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i)
         i += 1
       }
     }
    
     val loss = if (maxMargin > 0) {
       math.log(sum) - marginOfLabel + maxMargin
     } else {
       math.log(sum) - marginOfLabel
     }
     lossSum += weight * loss
    }  
    

    4.4 最優化方法(/breeze/optimize/)

    在LogisticCostFun中只計算了引數的一階導數gradient,然而原始碼中用的最優化方法是LBFGS和OWLQN(解決L1-norm不可微),因此只有gradient是不夠的。最優化的重點在於確定引數的更新方向和步長。 FirstOrderMinimizer中聲明瞭(1)海塞陣估計方法History(2)引數更新方向chooseDescentDirection(3)步長determineStepSize。在iterations中更新states時要用到上述三個重要模組,具體的定義則在相應的最優化方法中。
    1. LBFGS (breeze/optimize/LBFGS.scala)
      1. 還塞矩陣估計方法:`type History = LBFGS.ApproximateInverseHessian[T]`

      2. chooseDescentDirection:迭代方向,海塞陣*梯度

        ``` protected def chooseDescentDirection(state: State, fn: DiffFunction[T]): T = { state.history * state.grad } ```

      3. determineStepSize:一維搜尋,尋找最佳步長,具體原理此處省略
    
    /**
       * Given a direction, perform a line search to find
       * a step size to descend. The result fulfills the wolfe conditions.
       *
       * @param state the current state
       * @param f The objective
       * @param dir The step direction
       * @return stepSize
       */
      protected def determineStepSize(state: State, f: DiffFunction[T], dir: T) = {
        val x = state.x
        val grad = state.grad
    
        val ff = LineSearch.functionFromSearchDirection(f, x, dir)
        val search = new StrongWolfeLineSearch(maxZoomIter = 10, maxLineSearchIter = 10) // TODO: Need good default values here.
        val alpha = search.minimize(ff, if (state.iter == 0.0) 1.0 / norm(dir) else 1.0)
    
        if (alpha * norm(grad) < 1E-10)
          throw new StepSizeUnderflow
        alpha
      }
    
  • OWLQN (breeze/optimize/OWLQN),繼承了LBFGS
    1. 還塞矩陣估計方法:繼承了LBFGS的history,因為L1-norm的存在與否不影響Hessian矩陣的估計。

    2. chooseDescentDirection:繼承了LBFGS,同時相應調整方向,解決L1不可導(可參考OWLQN原理,次梯度)
  • override protected def chooseDescentDirection(state: State, fn: DiffFunction[T]) = {
        val descentDir = super.chooseDescentDirection(state.copy(grad = state.adjustedGradient), fn)
    
        // The original paper requires that the descent direction be corrected to be
        // in the same directional (within the same hypercube) as the adjusted gradient for proof.
        // Although this doesn't seem to affect the outcome that much in most of cases, there are some cases
        // where the algorithm won't converge (confirmed with the author, Galen Andrew).
        val correctedDir = space.zipMapValues.map(descentDir, state.adjustedGradient, {
          case (d, g) => if (d * g < 0) d else 0.0
        })
    
        correctedDir
      }
      }
    

  • determineStepSize:一維搜尋,尋找最佳步長,此處感覺和原理不大一樣(需要進一步研究)
  • 
    override protected def determineStepSize(state: State, f: DiffFunction[T], dir: T) = {
        val iter = state.iter
    
        val normGradInDir = {
          val possibleNorm = dir.dot(state.grad)
    //      if (possibleNorm > 0) { // hill climbing is not what we want. Bad LBFGS.
    //        logger.warn("Direction of positive gradient chosen!")
    //        logger.warn("Direction is:" + possibleNorm)
    //        Reverse the direction, clearly it's a bad idea to go up
    //        dir *= -1.0
    //        dir dot state.grad
    //      } else {
          possibleNorm
    //      }
        }
        
    
  • takeStep:OWLQN的takeStep更為複雜,因為要保證迭代前後處於同一象限(更新前權重與更新後權重同方向)
  • 
    // projects x to be on the same orthant as y
      // this basically requires that x'_i = x_i if sign(x_i) == sign(y_i), and 0 otherwise.
    
      override protected def takeStep(state: State, dir: T, stepSize: Double) = {
        val stepped = state.x + dir * stepSize
        val orthant = computeOrthant(state.x, state.adjustedGradient)
        space.zipMapValues.map(stepped, orthant, {
          case (v, ov) =>
            v * I(math.signum(v) == math.signum(ov))
        })
      }
        
    

    參考文獻

    【1】ufldl-SoftmaxRegression
    【2】部落格OWLQN

    備註

    文章有些細節原理沒有詳細深入,有些內容作者本人也有些confused(比如L2正則化中的standardization技巧),希望大家互相交流。